X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=f6fd068d0ddf8a1b7efd47b4528efa11f2b08a96;hp=32ca90290d3c316a24fc9c2252a215242e028117;hb=46b927d45eb2ee5db3e35df2a0ade4c11ba9f345;hpb=27f77f5de6fe27611c9f819f8cdaba89d70f15d9 diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 32ca902..f6fd068 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -84,7 +84,8 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, if (likely(!IS_ERR(o))) fo = ofd_obj(o); else - fo = (struct ofd_object *)o; /* return error */ + fo = ERR_CAST(o); /* return error */ + RETURN(fo); } @@ -103,7 +104,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, dto = dt_find_or_create(env, ofd->ofd_osd, fid, &info->fti_dof, attr); if (IS_ERR(dto)) - RETURN((struct ofd_object *)dto); + RETURN(ERR_CAST(dto)); fo_obj = lu_object_locate(dto->do_lu.lo_header, ofd->ofd_dt_dev.dd_lu_dev.ld_type); @@ -112,8 +113,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo) { - struct ofd_thread_info *info = ofd_info(env); - int rc = 0; + int rc = 0; ENTRY; @@ -122,9 +122,7 @@ int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo) * This actually means that we don't know whether the object * has the "fid" EA or not. */ - info->fti_buf.lb_buf = &info->fti_mds_fid2; - info->fti_buf.lb_len = sizeof(info->fti_mds_fid2); - rc = dt_xattr_get(env, ofd_object_child(fo), &info->fti_buf, + rc = dt_xattr_get(env, ofd_object_child(fo), &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA); if (rc >= 0 || rc == -ENODATA) { /* @@ -154,6 +152,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, struct dt_object *next; struct thandle *th; struct ofd_object **batch; + struct lu_fid *fid = &info->fti_fid; obd_id tmp; int rc; int i; @@ -163,14 +162,15 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, ENTRY; /* Don't create objects beyond the valid range for this SEQ */ - if (unlikely(fid_seq_is_mdt0(oseq->os_seq) && (id + nr) >= IDIF_MAX_OID)) { + if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && + (id + nr) >= IDIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", - ofd_name(ofd), id, oseq->os_seq); + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); - } else if (unlikely(!fid_seq_is_mdt0(oseq->os_seq) && + } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && (id + nr) >= OBIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", - ofd_name(ofd), id, oseq->os_seq); + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } @@ -195,21 +195,21 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, info->fti_attr.la_mtime = 0; info->fti_attr.la_ctime = 0; + LASSERT(id != 0); + /* prepare objects */ + *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu); for (i = 0; i < nr; i++) { - info->fti_ostid.oi_id = id + i; - info->fti_ostid.oi_seq = oseq->os_seq; - - rc = fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0); - if (rc) { + rc = fid_set_id(fid, id + i); + if (rc != 0) { if (i == 0) - GOTO(out, rc = PTR_ERR(fo)); + GOTO(out, rc); nr = i; break; } - fo = ofd_object_find(env, ofd, &info->fti_fid); + fo = ofd_object_find(env, ofd, fid); if (IS_ERR(fo)) { if (i == 0) GOTO(out, rc = PTR_ERR(fo)); @@ -243,8 +243,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(ofd_object_exists(fo))) { /* object may exist being re-created by write replay */ CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: " - DFID"\n", oseq->os_seq, id, - PFID(&info->fti_fid)); + DFID"\n", ostid_seq(&oseq->os_oi), id, + PFID(lu_object_fid(&fo->ofo_obj.do_lu))); continue; } @@ -263,13 +263,40 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (rc) GOTO(trans_stop, rc); - CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(&info->fti_fid)); + CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n", + ofd_name(ofd), PFID(fid), nr); + + LASSERT(nr > 0); + + /* When the LFSCK scanning the whole device to verify the LAST_ID file + * consistency, it will load the last_id into RAM firstly, and compare + * the last_id with each OST-object's ID. If the later one is larger, + * then it will regard the LAST_ID file crashed. But during the LFSCK + * scanning, the OFD may continue to create new OST-objects. Those new + * created OST-objects will have larger IDs than the LFSCK known ones. + * So from the LFSCK view, it needs to re-load the last_id from disk + * file, and if the latest last_id is still smaller than the object's + * ID, then the LAST_ID file is real crashed. + * + * To make above mechanism to work, before OFD pre-create OST-objects, + * it needs to update the LAST_ID file firstly, otherwise, the LFSCK + * may cannot get latest last_id although new OST-object created. */ + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) { + tmp = cpu_to_le64(id + nr - 1); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc != 0) + GOTO(trans_stop, rc); + } for (i = 0; i < nr; i++) { fo = batch[i]; LASSERT(fo); - if (likely(!ofd_object_exists(fo))) { + if (likely(!ofd_object_exists(fo) && + !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) { next = ofd_object_child(fo); LASSERT(next != NULL); @@ -283,11 +310,24 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, } objects = i; - if (objects > 0) { + /* NOT all the wanted objects have been created, + * set the LAST_ID as the real created. */ + if (unlikely(objects < nr)) { + int rc1; + + info->fti_off = 0; tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); - rc = dt_record_write(env, oseq->os_lastid_obj, - &info->fti_buf, &info->fti_off, th); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc1 = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc1 != 0) + CERROR("%s: fail to reset the LAST_ID for seq ("LPX64 + ") from "LPU64" to "LPU64"\n", ofd_name(ofd), + ostid_seq(&oseq->os_oi), id + nr - 1, + ofd_seq_last_oid(oseq)); } + trans_stop: ofd_trans_stop(env, ofd, th, rc); out: