X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_objects.c;h=39179690aa55a12dd858da77ee065db8ff5f5292;hp=b05972d8f1800d2c9890b1f99ff99eee9f6df109;hb=0209add4a5099817111c8576afe930d1e2daef03;hpb=fd3dcf04aa169560986a15880d0ced9b07adaad4 diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index b05972d..3917969 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,6 +43,7 @@ #include #include +#include #include "ofd_internal.h" @@ -84,7 +85,8 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, if (likely(!IS_ERR(o))) fo = ofd_obj(o); else - fo = (struct ofd_object *)o; /* return error */ + fo = ERR_CAST(o); /* return error */ + RETURN(fo); } @@ -103,7 +105,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env, dto = dt_find_or_create(env, ofd->ofd_osd, fid, &info->fti_dof, attr); if (IS_ERR(dto)) - RETURN((struct ofd_object *)dto); + RETURN(ERR_CAST(dto)); fo_obj = lu_object_locate(dto->do_lu.lo_header, ofd->ofd_dt_dev.dd_lu_dev.ld_type); @@ -151,6 +153,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, struct dt_object *next; struct thandle *th; struct ofd_object **batch; + struct lu_fid *fid = &info->fti_fid; obd_id tmp; int rc; int i; @@ -160,14 +163,15 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, ENTRY; /* Don't create objects beyond the valid range for this SEQ */ - if (unlikely(fid_seq_is_mdt0(oseq->os_seq) && (id + nr) >= IDIF_MAX_OID)) { + if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && + (id + nr) >= IDIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n", - ofd_name(ofd), id, oseq->os_seq); + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); - } else if (unlikely(!fid_seq_is_mdt0(oseq->os_seq) && + } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) && (id + nr) >= OBIF_MAX_OID)) { CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n", - ofd_name(ofd), id, oseq->os_seq); + ofd_name(ofd), id, ostid_seq(&oseq->os_oi)); RETURN(rc = -ENOSPC); } @@ -192,21 +196,21 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, info->fti_attr.la_mtime = 0; info->fti_attr.la_ctime = 0; + LASSERT(id != 0); + /* prepare objects */ + *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu); for (i = 0; i < nr; i++) { - info->fti_ostid.oi_id = id + i; - info->fti_ostid.oi_seq = oseq->os_seq; - - rc = fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0); - if (rc) { + rc = fid_set_id(fid, id + i); + if (rc != 0) { if (i == 0) - GOTO(out, rc = PTR_ERR(fo)); + GOTO(out, rc); nr = i; break; } - fo = ofd_object_find(env, ofd, &info->fti_fid); + fo = ofd_object_find(env, ofd, fid); if (IS_ERR(fo)) { if (i == 0) GOTO(out, rc = PTR_ERR(fo)); @@ -228,7 +232,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, th->th_sync |= sync; - rc = dt_declare_record_write(env, oseq->os_lastid_obj, sizeof(tmp), + rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf, info->fti_off, th); if (rc) GOTO(trans_stop, rc); @@ -240,8 +244,8 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(ofd_object_exists(fo))) { /* object may exist being re-created by write replay */ CDEBUG(D_INODE, "object "LPX64"/"LPX64" exists: " - DFID"\n", oseq->os_seq, id, - PFID(&info->fti_fid)); + DFID"\n", ostid_seq(&oseq->os_oi), id, + PFID(lu_object_fid(&fo->ofo_obj.do_lu))); continue; } @@ -260,13 +264,48 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, if (rc) GOTO(trans_stop, rc); - CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(&info->fti_fid)); + CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n", + ofd_name(ofd), PFID(fid), nr); + + LASSERT(nr > 0); + + /* When the LFSCK scanning the whole device to verify the LAST_ID file + * consistency, it will load the last_id into RAM firstly, and compare + * the last_id with each OST-object's ID. If the later one is larger, + * then it will regard the LAST_ID file crashed. But during the LFSCK + * scanning, the OFD may continue to create new OST-objects. Those new + * created OST-objects will have larger IDs than the LFSCK known ones. + * So from the LFSCK view, it needs to re-load the last_id from disk + * file, and if the latest last_id is still smaller than the object's + * ID, then the LAST_ID file is real crashed. + * + * To make above mechanism to work, before OFD pre-create OST-objects, + * it needs to update the LAST_ID file firstly, otherwise, the LFSCK + * may cannot get latest last_id although new OST-object created. */ + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) { + tmp = cpu_to_le64(id + nr - 1); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc != 0) + GOTO(trans_stop, rc); + } for (i = 0; i < nr; i++) { fo = batch[i]; LASSERT(fo); - if (likely(!ofd_object_exists(fo))) { + /* Only the new created objects need to be recorded. */ + if (ofd->ofd_osd->dd_record_fid_accessed) { + lfsck_pack_rfa(&ofd_info(env)->fti_lr, + lu_object_fid(&fo->ofo_obj.do_lu)); + lfsck_in_notify(env, ofd->ofd_osd, + &ofd_info(env)->fti_lr); + } + + if (likely(!ofd_object_exists(fo) && + !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) { next = ofd_object_child(fo); LASSERT(next != NULL); @@ -280,11 +319,24 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, } objects = i; - if (objects > 0) { + /* NOT all the wanted objects have been created, + * set the LAST_ID as the real created. */ + if (unlikely(objects < nr)) { + int rc1; + + info->fti_off = 0; tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); - rc = dt_record_write(env, oseq->os_lastid_obj, - &info->fti_buf, &info->fti_off, th); + dt_write_lock(env, oseq->os_lastid_obj, 0); + rc1 = dt_record_write(env, oseq->os_lastid_obj, + &info->fti_buf, &info->fti_off, th); + dt_write_unlock(env, oseq->os_lastid_obj); + if (rc1 != 0) + CERROR("%s: fail to reset the LAST_ID for seq ("LPX64 + ") from "LPU64" to "LPU64"\n", ofd_name(ofd), + ostid_seq(&oseq->os_oi), id + nr - 1, + ofd_seq_last_oid(oseq)); } + trans_stop: ofd_trans_stop(env, ofd, th, rc); out: