Whamcloud - gitweb
LU-3590 lfsck: repair MDT-object with dangling reference
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
index 3ee9592..15e9c05 100644 (file)
@@ -1368,6 +1368,22 @@ static void lfsck_layout_unlock(struct lustre_handle *lh)
        }
 }
 
+static int lfsck_layout_trans_stop(const struct lu_env *env,
+                                  struct dt_device *dev,
+                                  struct thandle *handle, int result)
+{
+       int rc;
+
+       handle->th_result = result;
+       rc = dt_trans_stop(env, dev, handle);
+       if (rc > 0)
+               rc = 0;
+       else if (rc == 0)
+               rc = 1;
+
+       return rc;
+}
+
 static int lfsck_layout_scan_orphan(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    struct lfsck_tgt_desc *ltd)
@@ -1377,6 +1393,181 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env,
        return 0;
 }
 
+/* For the MDT-object with dangling reference, we need to re-create
+ * the missed OST-object with the known FID/owner information. */
+static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct lfsck_layout_req *llr,
+                                       struct lu_attr *la)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct filter_fid               *pfid   = &info->lti_new_pfid;
+       struct dt_allocation_hint       *hint   = &info->lti_hint;
+       struct dt_object                *parent = llr->llr_parent->llo_obj;
+       struct dt_object                *child  = llr->llr_child;
+       struct dt_device                *dev    = lfsck_obj2dt_dev(child);
+       const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
+       struct thandle                  *handle;
+       struct lu_buf                   *buf;
+       struct lustre_handle             lh     = { 0 };
+       int                              rc;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
+              ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
+              PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
+              llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
+
+       rc = lfsck_layout_lock(env, com, parent, &lh,
+                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc != 0)
+               RETURN(rc);
+
+       handle = dt_trans_create(env, dev);
+       if (IS_ERR(handle))
+               GOTO(unlock1, rc = PTR_ERR(handle));
+
+       hint->dah_parent = NULL;
+       hint->dah_mode = 0;
+       pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
+       pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
+       pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
+       buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
+
+       rc = dt_declare_create(env, child, la, hint, NULL, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
+                                 LU_XATTR_CREATE, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start(env, dev, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_read_lock(env, parent, 0);
+       if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+               GOTO(unlock2, rc = 1);
+
+       rc = dt_create(env, child, la, hint, NULL, handle);
+       if (rc != 0)
+               GOTO(unlock2, rc);
+
+       rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
+                         handle, BYPASS_CAPA);
+
+       GOTO(unlock2, rc);
+
+unlock2:
+       dt_read_unlock(env, parent);
+
+stop:
+       rc = lfsck_layout_trans_stop(env, dev, handle, rc);
+
+unlock1:
+       lfsck_layout_unlock(&lh);
+
+       return rc;
+}
+
+static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
+                                            struct lfsck_component *com,
+                                            struct lfsck_layout_req *llr)
+{
+       struct lfsck_layout                  *lo     = com->lc_file_ram;
+       struct lfsck_thread_info             *info   = lfsck_env_info(env);
+       struct dt_object                     *parent = llr->llr_parent->llo_obj;
+       struct dt_object                     *child  = llr->llr_child;
+       struct lu_attr                       *pla    = &info->lti_la;
+       struct lu_attr                       *cla    = &info->lti_la2;
+       struct lfsck_instance                *lfsck  = com->lc_lfsck;
+       struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
+       enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
+       int                                   rc;
+       ENTRY;
+
+       rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
+       if (rc != 0) {
+               if (lu_object_is_dying(parent->do_lu.lo_header))
+                       RETURN(0);
+
+               GOTO(out, rc);
+       }
+
+       rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
+       if (rc == -ENOENT) {
+               if (lu_object_is_dying(parent->do_lu.lo_header))
+                       RETURN(0);
+
+               type = LLIT_DANGLING;
+               goto repair;
+       }
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* XXX: other inconsistency will be checked in other patches. */
+
+repair:
+       if (bk->lb_param & LPF_DRYRUN) {
+               if (type != LLIT_NONE)
+                       GOTO(out, rc = 1);
+               else
+                       GOTO(out, rc = 0);
+       }
+
+       switch (type) {
+       case LLIT_DANGLING:
+               memset(cla, 0, sizeof(*cla));
+               cla->la_uid = pla->la_uid;
+               cla->la_gid = pla->la_gid;
+               cla->la_mode = S_IFREG | 0666;
+               cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
+                               LA_ATIME | LA_MTIME | LA_CTIME;
+               rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
+               break;
+
+       /* XXX: other inconsistency will be fixed in other patches. */
+
+       case LLIT_UNMATCHED_PAIR:
+               break;
+       case LLIT_MULTIPLE_REFERENCED:
+               break;
+       case LLIT_INCONSISTENT_OWNER:
+               break;
+       default:
+               rc = 0;
+               break;
+       }
+
+       GOTO(out, rc);
+
+out:
+       down_write(&com->lc_sem);
+       if (rc < 0) {
+               /* If cannot touch the target server,
+                * mark the LFSCK as INCOMPLETE. */
+               if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
+                   rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
+                       lo->ll_flags |= LF_INCOMPLETE;
+                       lo->ll_objs_skipped++;
+                       rc = 0;
+               } else {
+                       lo->ll_objs_failed_phase1++;
+               }
+       } else if (rc > 0) {
+               LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
+                        "unknown type = %d\n", type);
+
+               lo->ll_objs_repaired[type - 1]++;
+       }
+       up_write(&com->lc_sem);
+
+       return rc;
+}
+
 static int lfsck_layout_assistant(void *args)
 {
        struct lfsck_thread_args        *lta     = args;
@@ -1432,15 +1623,16 @@ static int lfsck_layout_assistant(void *args)
                        if (unlikely(llmd->llmd_exit))
                                GOTO(cleanup1, rc = llmd->llmd_post_result);
 
-                       /* XXX: To be extended in other patch.
-                        *
-                        * Compare the OST side attribute with local attribute,
-                        * and fix it if found inconsistency. */
-
-                       spin_lock(&llmd->llmd_lock);
                        llr = list_entry(llmd->llmd_req_list.next,
                                         struct lfsck_layout_req,
                                         llr_list);
+                       /* Only the lfsck_layout_assistant thread itself can
+                        * remove the "llr" from the head of the list, LFSCK
+                        * engine thread only inserts other new "lld" at the
+                        * end of the list. So it is safe to handle current
+                        * "llr" without the spin_lock. */
+                       rc = lfsck_layout_assistant_handle_one(env, com, llr);
+                       spin_lock(&llmd->llmd_lock);
                        list_del_init(&llr->llr_list);
                        if (bk->lb_async_windows != 0 &&
                            llmd->llmd_prefetched >= bk->lb_async_windows)
@@ -1452,6 +1644,8 @@ static int lfsck_layout_assistant(void *args)
                                wake_up_all(&mthread->t_ctl_waitq);
 
                        lfsck_layout_req_fini(env, llr);
+                       if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                               GOTO(cleanup1, rc);
                }
 
                /* Wakeup the master engine if it is waiting in checkpoint. */
@@ -1564,6 +1758,9 @@ orphan:
 cleanup1:
        /* Cleanup the unfinished requests. */
        spin_lock(&llmd->llmd_lock);
+       if (rc < 0)
+               llmd->llmd_assistant_status = rc;
+
        while (!list_empty(&llmd->llmd_req_list)) {
                llr = list_entry(llmd->llmd_req_list.next,
                                 struct lfsck_layout_req,
@@ -2152,7 +2349,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
        __u16                            gen;
        ENTRY;
 
-       buf = lfsck_buf_get(env, &info->lti_pfid,
+       buf = lfsck_buf_get(env, &info->lti_old_pfid,
                            sizeof(struct filter_fid_old));
        count = le16_to_cpu(lmm->lmm_stripe_count);
        gen = le16_to_cpu(lmm->lmm_layout_gen);