X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=c60e18f7e63a95597f0c01ced862a0a6a5485606;hp=cf349ca395406abaedd318d049555155222dfa58;hb=b808da75ffe27b6199c636ff216817e12c6d3fa6;hpb=a42560217562c7b00e5b680b347687f894939601 diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index cf349ca..c60e18f 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,6 +42,8 @@ #define DEBUG_SUBSYSTEM S_FILTER #include +#include +#include #include "ofd_internal.h" @@ -51,7 +53,9 @@ int ofd_version_get_check(struct ofd_thread_info *info, dt_obj_version_t curr_version; LASSERT(ofd_object_exists(fo)); - LASSERT(info->fti_exp); + + if (info->fti_exp) + RETURN(0); curr_version = dt_version_get(info->fti_env, ofd_object_child(fo)); if ((__s64)curr_version == -EOPNOTSUPP) @@ -61,9 +65,9 @@ int ofd_version_get_check(struct ofd_thread_info *info, info->fti_pre_version != curr_version) { CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", info->fti_pre_version, curr_version); - cfs_spin_lock(&info->fti_exp->exp_lock); + spin_lock(&info->fti_exp->exp_lock); info->fti_exp->exp_vbr_failed = 1; - cfs_spin_unlock(&info->fti_exp->exp_lock); + spin_unlock(&info->fti_exp->exp_lock); RETURN (-EOVERFLOW); } info->fti_pre_version = curr_version; @@ -83,61 +87,43 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, if (likely(!IS_ERR(o))) fo = ofd_obj(o); else - fo = (struct ofd_object *)o; /* return error */ + fo = ERR_CAST(o); /* return error */ + RETURN(fo); } -struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, - struct ofd_device *ofd, - const struct lu_fid *fid, - struct lu_attr *attr) +int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) { struct ofd_thread_info *info = ofd_info(env); - struct lu_object *fo_obj; - struct dt_object *dto; + struct filter_fid_old *ff = &info->fti_mds_fid_old; + struct lu_buf *buf = &info->fti_buf; + struct lu_fid *pfid = &fo->ofo_pfid; + int rc = 0; - ENTRY; + if (fid_is_sane(pfid)) + return 0; - info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG); + buf->lb_buf = ff; + buf->lb_len = sizeof(*ff); + rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID, + BYPASS_CAPA); + if (rc < 0) + return rc; - dto = dt_find_or_create(env, ofd->ofd_osd, fid, &info->fti_dof, attr); - if (IS_ERR(dto)) - RETURN((struct ofd_object *)dto); + if (rc < sizeof(struct lu_fid)) { + fid_zero(pfid); - fo_obj = lu_object_locate(dto->do_lu.lo_header, - ofd->ofd_dt_dev.dd_lu_dev.ld_type); - RETURN(ofd_obj(fo_obj)); -} - -int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo) -{ - struct ofd_thread_info *info = ofd_info(env); - int rc = 0; + return -ENODATA; + } - ENTRY; + pfid->f_seq = le64_to_cpu(ff->ff_parent.f_seq); + pfid->f_oid = le32_to_cpu(ff->ff_parent.f_oid); + /* Currently, the filter_fid::ff_parent::f_ver is not the real parent + * MDT-object's FID::f_ver, instead it is the OST-object index in its + * parent MDT-object's layout EA. */ + pfid->f_stripe_idx = le32_to_cpu(ff->ff_parent.f_stripe_idx); - if (!fo->ofo_ff_exists) { - /* - * This actually means that we don't know whether the object - * has the "fid" EA or not. - */ - info->fti_buf.lb_buf = &info->fti_mds_fid2; - info->fti_buf.lb_len = sizeof(info->fti_mds_fid2); - rc = dt_xattr_get(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, BYPASS_CAPA); - if (rc >= 0 || rc == -ENODATA) { - /* - * Here we assume that, if the object doesn't have the - * "fid" EA, the caller will add one, unless a fatal - * error (e.g., a memory or disk failure) prevents it - * from doing so. - */ - fo->ofo_ff_exists = 1; - } - if (rc > 0) - rc = 0; - } - RETURN(rc); + return 0; } void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) @@ -145,35 +131,39 @@ void ofd_object_put(const struct lu_env *env, struct ofd_object *fo) lu_object_put(env, &fo->ofo_obj.do_lu); } -int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd, - obd_id id, obd_seq group) +int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, + obd_id id, struct ofd_seq *oseq, int nr, int sync) { struct ofd_thread_info *info = ofd_info(env); - struct ofd_object *fo; + struct ofd_object *fo = NULL; struct dt_object *next; struct thandle *th; + struct ofd_object **batch; + struct lu_fid *fid = &info->fti_fid; obd_id tmp; int rc; + int i; + int objects = 0; + int nr_saved = nr; ENTRY; /* Don't create objects beyond the valid range for this SEQ */ - if (unlikely(fid_seq_is_mdt0(group) && id >= IDIF_MAX_OID)) { - CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n", - ofd_name(ofd), id, group); + if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && + (id + nr) >= IDIF_MAX_OID)) { + CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); - } else if (unlikely(!fid_seq_is_mdt0(group) && id >= OBIF_MAX_OID)) { - CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n", - ofd_name(ofd), id, group); + } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && + (id + nr) >= OBIF_MAX_OID)) { + CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } - info->fti_ostid.oi_id = id; - info->fti_ostid.oi_seq = group; - fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0); - fo = ofd_object_find(env, ofd, &info->fti_fid); - if (IS_ERR(fo)) - RETURN(PTR_ERR(fo)); + OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *)); + if (batch == NULL) + RETURN(-ENOMEM); info->fti_attr.la_valid = LA_TYPE | LA_MODE; /* @@ -192,61 +182,164 @@ int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd, info->fti_attr.la_mtime = 0; info->fti_attr.la_ctime = 0; - next = ofd_object_child(fo); - LASSERT(next != NULL); + LASSERT(id != 0); + + /* prepare objects */ + *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu); + for (i = 0; i < nr; i++) { + rc = fid_set_id(fid, id + i); + if (rc != 0) { + if (i == 0) + GOTO(out, rc); + + nr = i; + break; + } + + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) { + if (i == 0) + GOTO(out, rc = PTR_ERR(fo)); + nr = i; + break; + } + + ofd_write_lock(env, fo); + batch[i] = fo; + } info->fti_buf.lb_buf = &tmp; info->fti_buf.lb_len = sizeof(tmp); info->fti_off = 0; - ofd_write_lock(env, fo); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) - GOTO(out_unlock, rc = PTR_ERR(th)); + GOTO(out, rc = PTR_ERR(th)); + + th->th_sync |= sync; - rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[group], - sizeof(tmp), info->fti_off, th); + rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf, + info->fti_off, th); if (rc) GOTO(trans_stop, rc); - if (unlikely(ofd_object_exists(fo))) { - /* object may exist being re-created by write replay */ - CDEBUG(D_INODE, "object %u/"LPD64" exists: "DFID"\n", - (unsigned) group, id, PFID(&info->fti_fid)); - rc = dt_trans_start_local(env, ofd->ofd_osd, th); - if (rc) - GOTO(trans_stop, rc); - GOTO(last_id_write, rc); + for (i = 0; i < nr; i++) { + fo = batch[i]; + LASSERT(fo); + + if (unlikely(ofd_object_exists(fo))) { + /* object may exist being re-created by write replay */ + CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: " + DFID"\n", ostid_seq(&oseq->os_oi), id, + PFID(lu_object_fid(&fo->ofo_obj.do_lu))); + continue; + } + + next = ofd_object_child(fo); + LASSERT(next != NULL); + + rc = dt_declare_create(env, next, &info->fti_attr, NULL, + &info->fti_dof, th); + if (rc) { + nr = i; + break; + } } - rc = dt_declare_create(env, next, &info->fti_attr, NULL, - &info->fti_dof, th); - if (rc) - GOTO(trans_stop, rc); rc = dt_trans_start_local(env, ofd->ofd_osd, th); if (rc) GOTO(trans_stop, rc); - CDEBUG(D_OTHER, "create new object %lu:%llu\n", - (unsigned long) info->fti_fid.f_oid, info->fti_fid.f_seq); + CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n", + ofd_name(ofd), PFID(fid), nr); + + LASSERT(nr > 0); + + /* When the LFSCK scanning the whole device to verify the LAST_ID file + * consistency, it will load the last_id into RAM firstly, and compare + * the last_id with each OST-object's ID. If the later one is larger, + * then it will regard the LAST_ID file crashed. But during the LFSCK + * scanning, the OFD may continue to create new OST-objects. Those new + * created OST-objects will have larger IDs than the LFSCK known ones. + * So from the LFSCK view, it needs to re-load the last_id from disk + * file, and if the latest last_id is still smaller than the object's + * ID, then the LAST_ID file is real crashed. + * + * To make above mechanism to work, before OFD pre-create OST-objects, + * it needs to update the LAST_ID file firstly, otherwise, the LFSCK + * may cannot get latest last_id although new OST-object created. */ + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) { + tmp = cpu_to_le64(id + nr - 1); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc != 0) + GOTO(trans_stop, rc); + } - rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th); - if (rc) - GOTO(trans_stop, rc); - LASSERT(ofd_object_exists(fo)); + for (i = 0; i < nr; i++) { + fo = batch[i]; + LASSERT(fo); -last_id_write: - ofd_last_id_set(ofd, id, group); + /* Only the new created objects need to be recorded. */ + if (ofd->ofd_osd->dd_record_fid_accessed) { + lfsck_pack_rfa(&ofd_info(env)->fti_lr, + lu_object_fid(&fo->ofo_obj.do_lu)); + lfsck_in_notify(env, ofd->ofd_osd, + &ofd_info(env)->fti_lr); + } + + if (likely(!ofd_object_exists(fo) && + !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) { + next = ofd_object_child(fo); + LASSERT(next != NULL); + + rc = dt_create(env, next, &info->fti_attr, NULL, + &info->fti_dof, th); + if (rc) + break; + LASSERT(ofd_object_exists(fo)); + } + ofd_seq_last_oid_set(oseq, id + i); + } + + objects = i; + /* NOT all the wanted objects have been created, + * set the LAST_ID as the real created. */ + if (unlikely(objects < nr)) { + int rc1; + + info->fti_off = 0; + tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc1 = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc1 != 0) + CERROR("%s: fail to reset the LAST_ID for seq ("LPX64 + ") from "LPU64" to "LPU64"\n", ofd_name(ofd), + ostid_seq(&oseq->os_oi), id + nr - 1, + ofd_seq_last_oid(oseq)); + } - tmp = cpu_to_le64(ofd_last_id(ofd, group)); - rc = dt_record_write(env, ofd->ofd_lastid_obj[group], &info->fti_buf, - &info->fti_off, th); trans_stop: ofd_trans_stop(env, ofd, th, rc); -out_unlock: - ofd_write_unlock(env, fo); - ofd_object_put(env, fo); - RETURN(rc); +out: + for (i = 0; i < nr_saved; i++) { + fo = batch[i]; + if (fo) { + ofd_write_unlock(env, fo); + ofd_object_put(env, fo); + } + } + OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *)); + + CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER, + "created %d/%d objects: %d\n", objects, nr_saved, rc); + + LASSERT(ergo(objects == 0, rc < 0)); + RETURN(objects > 0 ? objects : rc); } /* @@ -328,7 +421,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); if (ff != NULL) { - rc = ofd_object_ff_check(env, fo); + rc = ofd_object_ff_load(env, fo); if (rc == -ENODATA) ff_needed = 1; else if (rc < 0) @@ -362,20 +455,34 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) + if (ff_needed) { rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA); + if (rc == 0) { + fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); + fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); + /* Currently, the filter_fid::ff_parent::f_ver is not + * the real parent MDT-object's FID::f_ver, instead it + * is the OST-object index in its parent MDT-object's + * layout EA. */ + fo->ofo_pfid.f_stripe_idx = + le32_to_cpu(ff->ff_parent.f_stripe_idx); + } + } + + GOTO(stop, rc); stop: ofd_trans_stop(env, ofd, th, rc); unlock: ofd_write_unlock(env, fo); - RETURN(rc); + + return rc; } int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff) + struct filter_fid *ff, struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); @@ -399,6 +506,12 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (!ofd_object_exists(fo)) GOTO(unlock, rc = -ENOENT); + if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) { + rc = ofd_verify_ff(env, fo, oa); + if (rc != 0) + GOTO(unlock, rc); + } + /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) @@ -409,7 +522,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); if (ff != NULL) { - rc = ofd_object_ff_check(env, fo); + rc = ofd_object_ff_load(env, fo); if (rc == -ENODATA) ff_needed = 1; else if (rc < 0) @@ -451,15 +564,29 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) + if (ff_needed) { rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA); + if (rc == 0) { + fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq); + fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid); + /* Currently, the filter_fid::ff_parent::f_ver is not + * the real parent MDT-object's FID::f_ver, instead it + * is the OST-object index in its parent MDT-object's + * layout EA. */ + fo->ofo_pfid.f_stripe_idx = + le32_to_cpu(ff->ff_parent.f_stripe_idx); + } + } + + GOTO(stop, rc); stop: ofd_trans_stop(env, ofd, th, rc); unlock: ofd_write_unlock(env, fo); - RETURN(rc); + + return rc; } int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo, @@ -510,7 +637,7 @@ int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, rc = dt_attr_get(env, ofd_object_child(fo), la, ofd_object_capa(env, fo)); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0) +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) /* Try to correct for a bug in 2.1.0 (LU-221) that caused * negative timestamps to appear to be in the far future, * due old timestamp being stored on disk as an unsigned value.