struct lfsck_layout_slave_target *llsaa_llst;
};
+static struct lfsck_layout_object *
+lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
+ __u16 gen)
+{
+ struct lfsck_layout_object *llo;
+ int rc;
+
+ OBD_ALLOC_PTR(llo);
+ if (llo == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
+ if (rc != 0) {
+ OBD_FREE_PTR(llo);
+
+ return ERR_PTR(rc);
+ }
+
+ lu_object_get(&obj->do_lu);
+ llo->llo_obj = obj;
+ /* The gen can be used to check whether some others have changed the
+ * file layout after LFSCK pre-fetching but before real verification. */
+ llo->llo_gen = gen;
+ atomic_set(&llo->llo_ref, 1);
+
+ return llo;
+}
+
static inline void
lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
{
}
}
+static struct lfsck_layout_req *
+lfsck_layout_req_init(struct lfsck_layout_object *parent,
+ struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
+{
+ struct lfsck_layout_req *llr;
+
+ OBD_ALLOC_PTR(llr);
+ if (llr == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ INIT_LIST_HEAD(&llr->llr_list);
+ atomic_inc(&parent->llo_ref);
+ llr->llr_parent = parent;
+ llr->llr_child = child;
+ llr->llr_ost_idx = ost_idx;
+ llr->llr_lov_idx = lov_idx;
+
+ return llr;
+}
+
static inline void lfsck_layout_req_fini(const struct lu_env *env,
struct lfsck_layout_req *llr)
{
return empty;
}
+static int lfsck_layout_get_lovea(const struct lu_env *env,
+ struct dt_object *obj,
+ struct lu_buf *buf, ssize_t *buflen)
+{
+ int rc;
+
+again:
+ rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
+ if (rc == -ERANGE) {
+ rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
+ BYPASS_CAPA);
+ if (rc <= 0)
+ return rc;
+
+ lu_buf_realloc(buf, rc);
+ if (buflen != NULL)
+ *buflen = buf->lb_len;
+
+ if (buf->lb_buf == NULL)
+ return -ENOMEM;
+
+ goto again;
+ }
+
+ if (rc == -ENODATA)
+ rc = 0;
+
+ if (rc <= 0)
+ return rc;
+
+ if (unlikely(buf->lb_buf == NULL)) {
+ lu_buf_alloc(buf, rc);
+ if (buflen != NULL)
+ *buflen = buf->lb_len;
+
+ if (buf->lb_buf == NULL)
+ return -ENOMEM;
+
+ goto again;
+ }
+
+ return rc;
+}
+
+static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
+{
+ __u32 magic;
+ __u32 patten;
+
+ magic = le32_to_cpu(lmm->lmm_magic);
+ /* If magic crashed, keep it there. Sometime later, during OST-object
+ * orphan handling, if some OST-object(s) back-point to it, it can be
+ * verified and repaired. */
+ if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
+ return -EINVAL;
+
+ patten = le32_to_cpu(lmm->lmm_pattern);
+ /* XXX: currently, we only support LOV_PATTERN_RAID0. */
+ if (patten != LOV_PATTERN_RAID0)
+ return -EOPNOTSUPP;
+
+ return 0;
+}
+
static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
const struct lfsck_layout *src)
{
if (rc == sizeof(*lma)) {
lustre_lma_swab(lma);
- /* Generally, the low layer OSD create handler or OI scrub
- * will set the LMAC_FID_ON_OST for all external visible
- * OST-objects. But to make the otable-based iteration to
- * be independent from OI scrub in spite of it got failure
- * or not, we check the LMAC_FID_ON_OST here to guarantee
- * that the LFSCK will not repair something by wrong. */
return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
}
return rc;
}
+static int lfsck_layout_lock(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *obj,
+ struct lustre_handle *lh, __u64 bits)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ ldlm_policy_data_t *policy = &info->lti_policy;
+ struct ldlm_res_id *resid = &info->lti_resid;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ __u64 flags = LDLM_FL_ATOMIC_CB;
+ int rc;
+
+ LASSERT(lfsck->li_namespace != NULL);
+
+ memset(policy, 0, sizeof(*policy));
+ policy->l_inodebits.bits = bits;
+ fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
+ rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
+ policy, LCK_EX, &flags, ldlm_blocking_ast,
+ ldlm_completion_ast, NULL, NULL, 0,
+ LVB_T_NONE, NULL, lh);
+ if (rc == ELDLM_OK) {
+ rc = 0;
+ } else {
+ memset(lh, 0, sizeof(*lh));
+ rc = -EIO;
+ }
+
+ return rc;
+}
+
+static void lfsck_layout_unlock(struct lustre_handle *lh)
+{
+ if (lustre_handle_is_used(lh)) {
+ ldlm_lock_decref(lh, LCK_EX);
+ memset(lh, 0, sizeof(*lh));
+ }
+}
+
static int lfsck_layout_scan_orphan(const struct lu_env *env,
struct lfsck_component *com,
struct lfsck_tgt_desc *ltd)
RETURN(rc);
}
+/* Pre-fetch the attribute for each stripe in the given layout EA. */
+static int lfsck_layout_scan_stripes(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *parent,
+ struct lov_mds_md_v1 *lmm)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct lfsck_layout *lo = com->lc_file_ram;
+ struct lfsck_layout_master_data *llmd = com->lc_data;
+ struct lfsck_layout_object *llo = NULL;
+ struct lov_ost_data_v1 *objs;
+ struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
+ struct ptlrpc_thread *mthread = &lfsck->li_thread;
+ struct ptlrpc_thread *athread = &llmd->llmd_thread;
+ struct l_wait_info lwi = { 0 };
+ struct lu_buf *buf;
+ int rc = 0;
+ int i;
+ __u16 count;
+ __u16 gen;
+ ENTRY;
+
+ buf = lfsck_buf_get(env, &info->lti_pfid,
+ sizeof(struct filter_fid_old));
+ count = le16_to_cpu(lmm->lmm_stripe_count);
+ gen = le16_to_cpu(lmm->lmm_layout_gen);
+ if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
+ objs = &(lmm->lmm_objects[0]);
+ else
+ objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
+
+ for (i = 0; i < count; i++, objs++) {
+ struct lu_fid *fid = &info->lti_fid;
+ struct ost_id *oi = &info->lti_oi;
+ struct lfsck_layout_req *llr;
+ struct lfsck_tgt_desc *tgt = NULL;
+ struct dt_object *cobj = NULL;
+ __u32 index =
+ le32_to_cpu(objs->l_ost_idx);
+ bool wakeup = false;
+
+ l_wait_event(mthread->t_ctl_waitq,
+ bk->lb_async_windows == 0 ||
+ llmd->llmd_prefetched < bk->lb_async_windows ||
+ !thread_is_running(mthread) ||
+ thread_is_stopped(athread),
+ &lwi);
+
+ if (unlikely(!thread_is_running(mthread)) ||
+ thread_is_stopped(athread))
+ GOTO(out, rc = 0);
+
+ ostid_le_to_cpu(&objs->l_ost_oi, oi);
+ ostid_to_fid(fid, oi, index);
+ tgt = lfsck_tgt_get(ltds, index);
+ if (unlikely(tgt == NULL)) {
+ lo->ll_flags |= LF_INCOMPLETE;
+ goto next;
+ }
+
+ cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
+ if (IS_ERR(cobj)) {
+ rc = PTR_ERR(cobj);
+ goto next;
+ }
+
+ rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
+ if (rc != 0)
+ goto next;
+
+ rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
+ BYPASS_CAPA);
+ if (rc != 0)
+ goto next;
+
+ if (llo == NULL) {
+ llo = lfsck_layout_object_init(env, parent, gen);
+ if (IS_ERR(llo)) {
+ rc = PTR_ERR(llo);
+ goto next;
+ }
+ }
+
+ llr = lfsck_layout_req_init(llo, cobj, index, i);
+ if (IS_ERR(llr)) {
+ rc = PTR_ERR(llr);
+ goto next;
+ }
+
+ cobj = NULL;
+ spin_lock(&llmd->llmd_lock);
+ if (llmd->llmd_assistant_status < 0) {
+ spin_unlock(&llmd->llmd_lock);
+ lfsck_layout_req_fini(env, llr);
+ lfsck_tgt_put(tgt);
+ RETURN(llmd->llmd_assistant_status);
+ }
+
+ list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
+ if (llmd->llmd_prefetched == 0)
+ wakeup = true;
+
+ llmd->llmd_prefetched++;
+ spin_unlock(&llmd->llmd_lock);
+ if (wakeup)
+ wake_up_all(&athread->t_ctl_waitq);
+
+next:
+ down_write(&com->lc_sem);
+ com->lc_new_checked++;
+ if (rc < 0)
+ lo->ll_objs_failed_phase1++;
+ up_write(&com->lc_sem);
+
+ if (cobj != NULL && !IS_ERR(cobj))
+ lu_object_put(env, &cobj->do_lu);
+
+ if (likely(tgt != NULL))
+ lfsck_tgt_put(tgt);
+
+ if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+ GOTO(out, rc);
+ }
+
+ GOTO(out, rc = 0);
+
+out:
+ if (llo != NULL && !IS_ERR(llo))
+ lfsck_layout_object_put(env, llo);
+
+ return rc;
+}
+
+/* For the given object, read its layout EA locally. For each stripe, pre-fetch
+ * the OST-object's attribute and generate an structure lfsck_layout_req on the
+ * list ::llmd_req_list.
+ *
+ * For each request on above list, the lfsck_layout_assistant thread compares
+ * the OST side attribute with local attribute, if inconsistent, then repair it.
+ *
+ * All above processing is async mode with pipeline. */
static int lfsck_layout_master_exec_oit(const struct lu_env *env,
struct lfsck_component *com,
struct dt_object *obj)
{
- /* XXX: To be implemented in other patches.
- *
- * For the given object, read its layout EA locally. For each stripe,
- * pre-fetch the OST-object's attribute and generate an structure
- * lfsck_layout_req on the list ::llmd_req_list.
- *
- * For each request on the ::llmd_req_list, the lfsck_layout_assistant
- * thread will compare the OST side attribute with local attribute,
- * if inconsistent, then repair it.
- *
- * All above processing is async mode with pipeline. */
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct ost_id *oi = &info->lti_oi;
+ struct lfsck_layout *lo = com->lc_file_ram;
+ struct lfsck_layout_master_data *llmd = com->lc_data;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct thandle *handle = NULL;
+ struct lu_buf *buf = &info->lti_big_buf;
+ struct lov_mds_md_v1 *lmm = NULL;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct lustre_handle lh = { 0 };
+ ssize_t buflen = buf->lb_len;
+ int rc = 0;
+ bool locked = false;
+ bool stripe = false;
+ ENTRY;
- return 0;
+ if (!S_ISREG(lfsck_object_type(obj)))
+ GOTO(out, rc = 0);
+
+ if (llmd->llmd_assistant_status < 0)
+ GOTO(out, rc = -ESRCH);
+
+ fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
+ lmm_oi_cpu_to_le(oi, oi);
+ dt_read_lock(env, obj, 0);
+ locked = true;
+
+again:
+ rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
+ if (rc <= 0)
+ GOTO(out, rc);
+
+ buf->lb_len = rc;
+ lmm = buf->lb_buf;
+ rc = lfsck_layout_verify_header(lmm);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
+ GOTO(out, stripe = true);
+
+ /* Inconsistent lmm_oi, should be repaired. */
+ CDEBUG(D_LFSCK, "Repair bad lmm_oi for "DFID"\n",
+ PFID(lfsck_dto2fid(obj)));
+
+ if (bk->lb_param & LPF_DRYRUN) {
+ down_write(&com->lc_sem);
+ lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
+ up_write(&com->lc_sem);
+
+ GOTO(out, stripe = true);
+ }
+
+ if (!lustre_handle_is_used(&lh)) {
+ dt_read_unlock(env, obj);
+ locked = false;
+ buf->lb_len = buflen;
+ rc = lfsck_layout_lock(env, com, obj, &lh,
+ MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ handle = dt_trans_create(env, dev);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, dev, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ dt_write_lock(env, obj, 0);
+ locked = true;
+
+ goto again;
+ }
+
+ lmm->lmm_oi = *oi;
+ rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, handle, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ down_write(&com->lc_sem);
+ lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
+ up_write(&com->lc_sem);
+
+ GOTO(out, stripe = true);
+
+out:
+ if (locked) {
+ if (lustre_handle_is_used(&lh))
+ dt_write_unlock(env, obj);
+ else
+ dt_read_unlock(env, obj);
+ }
+
+ if (handle != NULL && !IS_ERR(handle))
+ dt_trans_stop(env, dev, handle);
+
+ lfsck_layout_unlock(&lh);
+ if (stripe) {
+ rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
+ } else {
+ down_write(&com->lc_sem);
+ com->lc_new_checked++;
+ if (rc < 0)
+ lo->ll_objs_failed_phase1++;
+ up_write(&com->lc_sem);
+ }
+ buf->lb_len = buflen;
+
+ return rc;
}
static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
#include <lustre_fid.h>
#include <lustre_lmv.h>
#include <obd_lov.h>
+#include <md_object.h>
#include "lod_internal.h"
.dbo_write = lod_write
};
-static int lod_object_init(const struct lu_env *env, struct lu_object *o,
+static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
const struct lu_object_conf *conf)
{
- struct lod_device *d = lu2lod_dev(o->lo_dev);
- struct lu_object *below;
- struct lu_device *under;
+ struct lod_device *lod = lu2lod_dev(lo->lo_dev);
+ struct lu_device *cdev = NULL;
+ struct lu_object *cobj;
+ struct lod_tgt_descs *ltd = NULL;
+ struct lod_tgt_desc *tgt;
+ mdsno_t idx = 0;
+ int type = LU_SEQ_RANGE_ANY;
+ int rc;
ENTRY;
- /*
- * create local object
- */
- under = &d->lod_child->dd_lu_dev;
- below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
- if (below == NULL)
+ rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (type == LU_SEQ_RANGE_MDT &&
+ idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
+ cdev = &lod->lod_child->dd_lu_dev;
+ } else if (type == LU_SEQ_RANGE_MDT) {
+ ltd = &lod->lod_mdt_descs;
+ lod_getref(ltd);
+ } else if (type == LU_SEQ_RANGE_OST) {
+ ltd = &lod->lod_ost_descs;
+ lod_getref(ltd);
+ } else {
+ LBUG();
+ }
+
+ if (ltd != NULL) {
+ if (ltd->ltd_tgts_size > idx &&
+ cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
+ tgt = LTD_TGT(ltd, idx);
+
+ LASSERT(tgt != NULL);
+ LASSERT(tgt->ltd_tgt != NULL);
+
+ cdev = &(tgt->ltd_tgt->dd_lu_dev);
+ }
+ lod_putref(lod, ltd);
+ }
+
+ if (unlikely(cdev == NULL))
+ RETURN(-ENOENT);
+
+ cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
+ if (unlikely(cobj == NULL))
RETURN(-ENOMEM);
- lu_object_add(o, below);
+ lu_object_add(lo, cobj);
RETURN(0);
}
.loo_object_release = lod_object_release,
.loo_object_print = lod_object_print,
};
-
-/**
- * Init remote lod object
- */
-static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
- const struct lu_object_conf *conf)
-{
- struct lod_device *lod = lu2lod_dev(lo->lo_dev);
- struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
- struct lu_device *c_dev = NULL;
- struct lu_object *c_obj;
- int i;
- ENTRY;
-
- lod_getref(ltd);
- if (ltd->ltd_tgts_size > 0) {
- cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
- struct lod_tgt_desc *tgt;
- tgt = LTD_TGT(ltd, i);
- LASSERT(tgt && tgt->ltd_tgt);
- if (tgt->ltd_index ==
- lu2lod_obj(lo)->ldo_mds_num) {
- c_dev = &(tgt->ltd_tgt->dd_lu_dev);
- break;
- }
- }
- }
- lod_putref(lod, ltd);
-
- if (unlikely(c_dev == NULL))
- RETURN(-ENOENT);
-
- c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
- if (unlikely(c_obj == NULL))
- RETURN(-ENOMEM);
-
- lu_object_add(lo, c_obj);
-
- RETURN(0);
-}
-
-struct lu_object_operations lod_lu_robj_ops = {
- .loo_object_init = lod_robject_init,
- .loo_object_start = lod_object_start,
- .loo_object_free = lod_object_free,
- .loo_object_release = lod_object_release,
- .loo_object_print = lod_object_print,
-};