Whamcloud - gitweb
LU-3590 lfsck: repair MDT-object with dangling reference 17/7517/28
authorFan Yong <fan.yong@intel.com>
Wed, 5 Feb 2014 17:46:42 +0000 (01:46 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 22 Feb 2014 18:34:09 +0000 (18:34 +0000)
If the OST-object referenced by the MDT-object is lost, then the
LFSCK needs to recreate the OST-object with the specified FID and
initialize it with the given parent MDT-object FID and owner attr.
Although the new created OST-object is initialized, the SUID+SGID
mode will be kept, which will be dropped by the first modification
RPC, like write/punch/setattr. Then we can distinguish whether the
recreate OST-object has been modified or not.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ic45254695e7b1902020c133bb23fd32685b9a414
Reviewed-on: http://review.whamcloud.com/7517
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd_support.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/ofd/ofd_objects.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/tests/sanity-lfsck.sh

index 369cde6..f47c845 100644 (file)
@@ -501,6 +501,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_SKIP_LASTID     0x160d
 #define OBD_FAIL_LFSCK_DELAY4          0x160e
 #define OBD_FAIL_LFSCK_BAD_LMMOI       0x160f
+#define OBD_FAIL_LFSCK_DANGLING        0x1610
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index a97947a..8862f83 100644 (file)
@@ -525,6 +525,7 @@ struct lfsck_thread_info {
        struct lu_fid           lti_fid;
        struct lu_fid           lti_fid2;
        struct lu_attr          lti_la;
+       struct lu_attr          lti_la2;
        struct ost_id           lti_oi;
        union {
                struct lustre_mdt_attrs lti_lma;
@@ -541,7 +542,11 @@ struct lfsck_thread_info {
        struct lfsck_stop       lti_stop;
        ldlm_policy_data_t      lti_policy;
        struct ldlm_res_id      lti_resid;
-       struct filter_fid_old   lti_pfid;
+       union {
+               struct filter_fid_old   lti_old_pfid;
+               struct filter_fid       lti_new_pfid;
+       };
+       struct dt_allocation_hint lti_hint;
 };
 
 /* lfsck_lib.c */
@@ -604,6 +609,11 @@ extern const char *lfsck_flags_names[];
 extern const char *lfsck_param_names[];
 extern struct lu_context_key lfsck_thread_key;
 
+static inline struct dt_device *lfsck_obj2dt_dev(struct dt_object *obj)
+{
+       return container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
+}
+
 static inline struct lfsck_thread_info *
 lfsck_env_info(const struct lu_env *env)
 {
index 3ee9592..15e9c05 100644 (file)
@@ -1368,6 +1368,22 @@ static void lfsck_layout_unlock(struct lustre_handle *lh)
        }
 }
 
+static int lfsck_layout_trans_stop(const struct lu_env *env,
+                                  struct dt_device *dev,
+                                  struct thandle *handle, int result)
+{
+       int rc;
+
+       handle->th_result = result;
+       rc = dt_trans_stop(env, dev, handle);
+       if (rc > 0)
+               rc = 0;
+       else if (rc == 0)
+               rc = 1;
+
+       return rc;
+}
+
 static int lfsck_layout_scan_orphan(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    struct lfsck_tgt_desc *ltd)
@@ -1377,6 +1393,181 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env,
        return 0;
 }
 
+/* For the MDT-object with dangling reference, we need to re-create
+ * the missed OST-object with the known FID/owner information. */
+static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct lfsck_layout_req *llr,
+                                       struct lu_attr *la)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct filter_fid               *pfid   = &info->lti_new_pfid;
+       struct dt_allocation_hint       *hint   = &info->lti_hint;
+       struct dt_object                *parent = llr->llr_parent->llo_obj;
+       struct dt_object                *child  = llr->llr_child;
+       struct dt_device                *dev    = lfsck_obj2dt_dev(child);
+       const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
+       struct thandle                  *handle;
+       struct lu_buf                   *buf;
+       struct lustre_handle             lh     = { 0 };
+       int                              rc;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
+              ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
+              PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
+              llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
+
+       rc = lfsck_layout_lock(env, com, parent, &lh,
+                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc != 0)
+               RETURN(rc);
+
+       handle = dt_trans_create(env, dev);
+       if (IS_ERR(handle))
+               GOTO(unlock1, rc = PTR_ERR(handle));
+
+       hint->dah_parent = NULL;
+       hint->dah_mode = 0;
+       pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
+       pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
+       pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
+       buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
+
+       rc = dt_declare_create(env, child, la, hint, NULL, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
+                                 LU_XATTR_CREATE, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start(env, dev, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_read_lock(env, parent, 0);
+       if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+               GOTO(unlock2, rc = 1);
+
+       rc = dt_create(env, child, la, hint, NULL, handle);
+       if (rc != 0)
+               GOTO(unlock2, rc);
+
+       rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
+                         handle, BYPASS_CAPA);
+
+       GOTO(unlock2, rc);
+
+unlock2:
+       dt_read_unlock(env, parent);
+
+stop:
+       rc = lfsck_layout_trans_stop(env, dev, handle, rc);
+
+unlock1:
+       lfsck_layout_unlock(&lh);
+
+       return rc;
+}
+
+static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
+                                            struct lfsck_component *com,
+                                            struct lfsck_layout_req *llr)
+{
+       struct lfsck_layout                  *lo     = com->lc_file_ram;
+       struct lfsck_thread_info             *info   = lfsck_env_info(env);
+       struct dt_object                     *parent = llr->llr_parent->llo_obj;
+       struct dt_object                     *child  = llr->llr_child;
+       struct lu_attr                       *pla    = &info->lti_la;
+       struct lu_attr                       *cla    = &info->lti_la2;
+       struct lfsck_instance                *lfsck  = com->lc_lfsck;
+       struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
+       enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
+       int                                   rc;
+       ENTRY;
+
+       rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
+       if (rc != 0) {
+               if (lu_object_is_dying(parent->do_lu.lo_header))
+                       RETURN(0);
+
+               GOTO(out, rc);
+       }
+
+       rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
+       if (rc == -ENOENT) {
+               if (lu_object_is_dying(parent->do_lu.lo_header))
+                       RETURN(0);
+
+               type = LLIT_DANGLING;
+               goto repair;
+       }
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* XXX: other inconsistency will be checked in other patches. */
+
+repair:
+       if (bk->lb_param & LPF_DRYRUN) {
+               if (type != LLIT_NONE)
+                       GOTO(out, rc = 1);
+               else
+                       GOTO(out, rc = 0);
+       }
+
+       switch (type) {
+       case LLIT_DANGLING:
+               memset(cla, 0, sizeof(*cla));
+               cla->la_uid = pla->la_uid;
+               cla->la_gid = pla->la_gid;
+               cla->la_mode = S_IFREG | 0666;
+               cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
+                               LA_ATIME | LA_MTIME | LA_CTIME;
+               rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
+               break;
+
+       /* XXX: other inconsistency will be fixed in other patches. */
+
+       case LLIT_UNMATCHED_PAIR:
+               break;
+       case LLIT_MULTIPLE_REFERENCED:
+               break;
+       case LLIT_INCONSISTENT_OWNER:
+               break;
+       default:
+               rc = 0;
+               break;
+       }
+
+       GOTO(out, rc);
+
+out:
+       down_write(&com->lc_sem);
+       if (rc < 0) {
+               /* If cannot touch the target server,
+                * mark the LFSCK as INCOMPLETE. */
+               if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
+                   rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
+                       lo->ll_flags |= LF_INCOMPLETE;
+                       lo->ll_objs_skipped++;
+                       rc = 0;
+               } else {
+                       lo->ll_objs_failed_phase1++;
+               }
+       } else if (rc > 0) {
+               LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
+                        "unknown type = %d\n", type);
+
+               lo->ll_objs_repaired[type - 1]++;
+       }
+       up_write(&com->lc_sem);
+
+       return rc;
+}
+
 static int lfsck_layout_assistant(void *args)
 {
        struct lfsck_thread_args        *lta     = args;
@@ -1432,15 +1623,16 @@ static int lfsck_layout_assistant(void *args)
                        if (unlikely(llmd->llmd_exit))
                                GOTO(cleanup1, rc = llmd->llmd_post_result);
 
-                       /* XXX: To be extended in other patch.
-                        *
-                        * Compare the OST side attribute with local attribute,
-                        * and fix it if found inconsistency. */
-
-                       spin_lock(&llmd->llmd_lock);
                        llr = list_entry(llmd->llmd_req_list.next,
                                         struct lfsck_layout_req,
                                         llr_list);
+                       /* Only the lfsck_layout_assistant thread itself can
+                        * remove the "llr" from the head of the list, LFSCK
+                        * engine thread only inserts other new "lld" at the
+                        * end of the list. So it is safe to handle current
+                        * "llr" without the spin_lock. */
+                       rc = lfsck_layout_assistant_handle_one(env, com, llr);
+                       spin_lock(&llmd->llmd_lock);
                        list_del_init(&llr->llr_list);
                        if (bk->lb_async_windows != 0 &&
                            llmd->llmd_prefetched >= bk->lb_async_windows)
@@ -1452,6 +1644,8 @@ static int lfsck_layout_assistant(void *args)
                                wake_up_all(&mthread->t_ctl_waitq);
 
                        lfsck_layout_req_fini(env, llr);
+                       if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                               GOTO(cleanup1, rc);
                }
 
                /* Wakeup the master engine if it is waiting in checkpoint. */
@@ -1564,6 +1758,9 @@ orphan:
 cleanup1:
        /* Cleanup the unfinished requests. */
        spin_lock(&llmd->llmd_lock);
+       if (rc < 0)
+               llmd->llmd_assistant_status = rc;
+
        while (!list_empty(&llmd->llmd_req_list)) {
                llr = list_entry(llmd->llmd_req_list.next,
                                 struct lfsck_layout_req,
@@ -2152,7 +2349,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
        __u16                            gen;
        ENTRY;
 
-       buf = lfsck_buf_get(env, &info->lti_pfid,
+       buf = lfsck_buf_get(env, &info->lti_old_pfid,
                            sizeof(struct filter_fid_old));
        count = le16_to_cpu(lmm->lmm_stripe_count);
        gen = le16_to_cpu(lmm->lmm_layout_gen);
index 6458498..f6fd068 100644 (file)
@@ -295,7 +295,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                fo = batch[i];
                LASSERT(fo);
 
-               if (likely(!ofd_object_exists(fo))) {
+               if (likely(!ofd_object_exists(fo) &&
+                          !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
                        next = ofd_object_child(fo);
                        LASSERT(next != NULL);
 
index 383a3d1..d871528 100644 (file)
@@ -497,6 +497,16 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th);
 
+/* osp_md_object.c */
+int osp_md_declare_object_create(const struct lu_env *env,
+                                struct dt_object *dt,
+                                struct lu_attr *attr,
+                                struct dt_allocation_hint *hint,
+                                struct dt_object_format *dof,
+                                struct thandle *th);
+int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
+                        struct lu_attr *attr, struct dt_allocation_hint *hint,
+                        struct dt_object_format *dof, struct thandle *th);
 /* osp_precreate.c */
 int osp_init_precreate(struct osp_device *d);
 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
index a39cdbc..6ef3019 100644 (file)
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
-static int osp_md_declare_object_create(const struct lu_env *env,
-                                       struct dt_object *dt,
-                                       struct lu_attr *attr,
-                                       struct dt_allocation_hint *hint,
-                                       struct dt_object_format *dof,
-                                       struct thandle *th)
+int osp_md_declare_object_create(const struct lu_env *env,
+                                struct dt_object *dt,
+                                struct lu_attr *attr,
+                                struct dt_allocation_hint *hint,
+                                struct dt_object_format *dof,
+                                struct thandle *th)
 {
        struct osp_thread_info  *osi = osp_env_info(env);
        struct update_request   *update;
@@ -62,7 +62,6 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        }
 
        osi->osi_obdo.o_valid = 0;
-       LASSERT(S_ISDIR(attr->la_mode));
        obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
        lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
        obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
@@ -132,11 +131,9 @@ out:
        return rc;
 }
 
-static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
-                               struct lu_attr *attr,
-                               struct dt_allocation_hint *hint,
-                               struct dt_object_format *dof,
-                               struct thandle *th)
+int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
+                        struct lu_attr *attr, struct dt_allocation_hint *hint,
+                        struct dt_object_format *dof, struct thandle *th)
 {
        struct osp_object  *obj = dt2osp_obj(dt);
 
@@ -147,7 +144,8 @@ static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
         * if creation reaches here, it means the object has been created
         * successfully */
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
-       obj->opo_empty = 1;
+       if (S_ISDIR(attr->la_mode))
+               obj->opo_empty = 1;
 
        return 0;
 }
index 5779b8d..c9e1e84 100644 (file)
@@ -864,11 +864,19 @@ static int osp_declare_object_create(const struct lu_env *env,
        struct osp_thread_info  *osi = osp_env_info(env);
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
-       const struct lu_fid     *fid;
+       const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
        int                      rc = 0;
 
        ENTRY;
 
+       if (is_remote_trans(th)) {
+               LASSERT(fid_is_sane(fid));
+
+               rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th);
+
+               RETURN(rc);
+       }
+
        /* should happen to non-0 OSP only so that at least one object
         * has been already declared in the scenario and LOD should
         * cleanup that */
@@ -876,7 +884,6 @@ static int osp_declare_object_create(const struct lu_env *env,
                RETURN(-ENOSPC);
 
        LASSERT(d->opd_last_used_oid_file);
-       fid = lu_object_fid(&dt->do_lu);
 
        /*
         * There can be gaps in precreated ids and record to unlink llog
@@ -936,6 +943,16 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        struct lu_fid           *fid = &osi->osi_fid;
        ENTRY;
 
+       if (is_remote_trans(th)) {
+               LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu)));
+
+               rc = osp_md_object_create(env, dt, attr, hint, dof, th);
+               if (rc == 0)
+                       o->opo_non_exist = 0;
+
+               RETURN(rc);
+       }
+
        o->opo_non_exist = 0;
        if (o->opo_reserved) {
                /* regular case, fid is assigned holding trunsaction open */
index 9b48d7c..1076bfb 100644 (file)
@@ -43,7 +43,7 @@ check_and_setup_lustre
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2c"
 
 [[ $(lustre_version_code ost1) -lt $(version_code 2.5.50) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14"
 
 build_test_filter
 
@@ -1274,6 +1274,53 @@ test_13() {
 }
 run_test 13 "LFSCK can repair crashed lmm_oi"
 
+test_14() {
+       echo "#####"
+       echo "The OST-object referenced by the MDT-object should be there;"
+       echo "otherwise, the LFSCK should re-create the missed OST-object."
+       echo "#####"
+
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+
+       echo "Inject failure stub to simulate dangling referenced MDT-object"
+       #define OBD_FAIL_LFSCK_DANGLING 0x1610
+       do_facet ost1 $LCTL set_param fail_loc=0x1610
+       createmany -o $DIR/$tdir/f 64
+       do_facet ost1 $LCTL set_param fail_loc=0
+
+       echo "stopall to cleanup object cache"
+       stopall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       echo "'ls' should fail because of dangling referenced MDT-object"
+       ls -ail $DIR/$tdir > /dev/null 2>&1 && error "(1) ls should fail."
+
+       echo "Trigger layout LFSCK to find out dangling reference and fix them"
+       $START_LAYOUT || error "(2) Fail to start LFSCK for layout!"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_layout |
+               awk '/^status/ { print \\\$2 }'" "completed" 6 || return 3
+
+       local repaired=$($SHOW_LAYOUT |
+                        awk '/^repaired_dangling/ { print $2 }')
+       [ $repaired -eq 32 ] ||
+               error "(4) Fail to repair dangling reference: $repaired"
+
+       echo "'ls' should success after layout LFSCK repairing"
+       ls -ail $DIR/$tdir > /dev/null || error "(5) ls should success."
+}
+run_test 14 "LFSCK can repair MDT-object with dangling reference"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size