X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Flocal_storage.c;h=7f4130400113d08a8b1d6338ee28ed59b97a4a3f;hb=bc71055256db623ba2062f8f299d8b603d89e0d9;hp=7b36943bdba23d9c3491b056ed23b8b9ee944226;hpb=4a88dc81ff314005e428cbfa1c931c71ff03c212;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/local_storage.c b/lustre/obdclass/local_storage.c index 7b36943..7f41304 100644 --- a/lustre/obdclass/local_storage.c +++ b/lustre/obdclass/local_storage.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * lustre/obdclass/local_storage.c @@ -208,7 +208,7 @@ int local_object_fid_generate(const struct lu_env *env, mutex_lock(&los->los_id_lock); fid->f_seq = los->los_seq; - fid->f_oid = los->los_last_oid++; + fid->f_oid = ++los->los_last_oid; fid->f_ver = 0; mutex_unlock(&los->los_id_lock); @@ -252,7 +252,7 @@ int local_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { struct dt_thread_info *dti = dt_info(env); - struct los_ondisk losd; + obd_id lastid; int rc; ENTRY; @@ -274,12 +274,11 @@ int local_object_create(const struct lu_env *env, /* update local oid number on disk so that * we know the last one used after reboot */ - losd.lso_magic = cpu_to_le32(LOS_MAGIC); - losd.lso_next_oid = cpu_to_le32(los->los_last_oid); + lastid = cpu_to_le64(los->los_last_oid); dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off, th); mutex_unlock(&los->los_id_lock); @@ -436,11 +435,7 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); if (rc == 0) { - /* name is found, get the object */ - if (!lu_fid_eq(fid, &dti->dti_fid)) - dto = ERR_PTR(-EINVAL); - else - dto = dt_locate(env, dt, fid); + dto = dt_locate(env, dt, &dti->dti_fid); } else if (rc != -ENOENT) { dto = ERR_PTR(rc); } else { @@ -566,6 +561,81 @@ local_index_find_or_create_with_fid(const struct lu_env *env, } EXPORT_SYMBOL(local_index_find_or_create_with_fid); +static int local_object_declare_unlink(const struct lu_env *env, + struct dt_device *dt, + struct dt_object *p, + struct dt_object *c, const char *name, + struct thandle *th) +{ + int rc; + + rc = dt_declare_delete(env, p, (const struct dt_key *)name, th); + if (rc < 0) + return rc; + + rc = dt_declare_ref_del(env, c, th); + if (rc < 0) + return rc; + + return dt_declare_destroy(env, c, th); +} + +int local_object_unlink(const struct lu_env *env, struct dt_device *dt, + struct dt_object *parent, const char *name) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *dto; + struct thandle *th; + int rc; + + ENTRY; + + rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); + if (rc == -ENOENT) + RETURN(0); + else if (rc < 0) + RETURN(rc); + + dto = dt_locate(env, dt, &dti->dti_fid); + if (unlikely(IS_ERR(dto))) + RETURN(PTR_ERR(dto)); + + th = dt_trans_create(env, dt); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rc = local_object_declare_unlink(env, dt, parent, dto, name, th); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dt, th); + if (rc < 0) + GOTO(stop, rc); + + dt_write_lock(env, dto, 0); + rc = dt_delete(env, parent, (struct dt_key *)name, th, BYPASS_CAPA); + if (rc < 0) + GOTO(unlock, rc); + + rc = dt_ref_del(env, dto, th); + if (rc < 0) { + rc = dt_insert(env, parent, + (const struct dt_rec *)&dti->dti_fid, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + GOTO(unlock, rc); + } + + rc = dt_destroy(env, dto, th); +unlock: + dt_write_unlock(env, dto); +stop: + dt_trans_stop(env, dt, th); +out: + lu_object_put_nocache(env, &dto->do_lu); + return rc; +} +EXPORT_SYMBOL(local_object_unlink); + struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) { struct local_oid_storage *los, *ret = NULL; @@ -589,6 +659,79 @@ void dt_los_put(struct local_oid_storage *los) return; } +/* after Lustre 2.3 release there may be old file to store last generated FID + * If such file exists then we have to read its content + */ +int lastid_compat_check(const struct lu_env *env, struct dt_device *dev, + __u64 lastid_seq, __u32 *first_oid, struct ls_device *ls) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *root = NULL; + struct los_ondisk losd; + struct dt_object *o = NULL; + int rc = 0; + + rc = dt_root_get(env, dev, &dti->dti_fid); + if (rc) + return rc; + + root = ls_locate(env, ls, &dti->dti_fid); + if (IS_ERR(root)) + return PTR_ERR(root); + + /* find old last_id file */ + snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-"LPX64"-lastid", + lastid_seq); + rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid); + lu_object_put_nocache(env, &root->do_lu); + if (rc == -ENOENT) { + /* old llog lastid accessed by FID only */ + if (lastid_seq != FID_SEQ_LLOG) + return 0; + dti->dti_fid.f_seq = FID_SEQ_LLOG; + dti->dti_fid.f_oid = 1; + dti->dti_fid.f_ver = 0; + o = ls_locate(env, ls, &dti->dti_fid); + if (IS_ERR(o)) + return PTR_ERR(o); + + if (!dt_object_exists(o)) { + lu_object_put_nocache(env, &o->do_lu); + return 0; + } + CDEBUG(D_INFO, "Found old llog lastid file\n"); + } else if (rc < 0) { + return rc; + } else { + CDEBUG(D_INFO, "Found old lastid file for sequence "LPX64"\n", + lastid_seq); + o = ls_locate(env, ls, &dti->dti_fid); + if (IS_ERR(o)) + return PTR_ERR(o); + } + /* let's read seq-NNNNNN-lastid file value */ + LASSERT(dt_object_exists(o)); + dti->dti_off = 0; + dti->dti_lb.lb_buf = &losd; + dti->dti_lb.lb_len = sizeof(losd); + dt_read_lock(env, o, 0); + rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); + dt_read_unlock(env, o); + lu_object_put_nocache(env, &o->do_lu); + if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { + CERROR("%s: wrong content of seq-"LPX64"-lastid file, magic %x\n", + o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, + le32_to_cpu(losd.lso_magic)); + return -EINVAL; + } else if (rc < 0) { + CERROR("%s: failed to read seq-"LPX64"-lastid: rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, rc); + return rc; + } + *first_oid = le32_to_cpu(losd.lso_next_oid); + return rc; +} + /** * Initialize local OID storage for required sequence. * That may be needed for services that uses local files and requires @@ -612,11 +755,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, { struct dt_thread_info *dti = dt_info(env); struct ls_device *ls; - struct los_ondisk losd; - struct dt_object *root = NULL; + obd_id lastid; struct dt_object *o = NULL; struct thandle *th; - int rc; + __u32 first_oid = fid_oid(first_fid); + int rc = 0; ENTRY; @@ -640,36 +783,25 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, cfs_atomic_inc(&ls->ls_refcount); cfs_list_add(&(*los)->los_list, &ls->ls_los_list); - rc = dt_root_get(env, dev, &dti->dti_fid); - if (rc) - GOTO(out_los, rc); - - root = ls_locate(env, ls, &dti->dti_fid); - if (IS_ERR(root)) - GOTO(out_los, rc = PTR_ERR(root)); - - snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid", - fid_seq(first_fid)); - rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid); - if (rc != 0 && rc != -ENOENT) - GOTO(out_los, rc); - - /* initialize data allowing to generate new fids, - * literally we need a sequence */ - if (rc == 0) - o = ls_locate(env, ls, &dti->dti_fid); - else - o = ls_locate(env, ls, first_fid); + /* Use {seq, 0, 0} to create the LAST_ID file for every + * sequence. OIDs start at LUSTRE_FID_INIT_OID. + */ + dti->dti_fid.f_seq = fid_seq(first_fid); + dti->dti_fid.f_oid = LUSTRE_FID_LASTID_OID; + dti->dti_fid.f_ver = 0; + o = ls_locate(env, ls, &dti->dti_fid); if (IS_ERR(o)) GOTO(out_los, rc = PTR_ERR(o)); - dt_write_lock(env, o, 0); if (!dt_object_exists(o)) { - LASSERT(rc == -ENOENT); + rc = lastid_compat_check(env, dev, fid_seq(first_fid), + &first_oid, ls); + if (rc < 0) + GOTO(out_los, rc); th = dt_trans_create(env, dev); if (IS_ERR(th)) - GOTO(out_lock, rc = PTR_ERR(th)); + GOTO(out_los, rc = PTR_ERR(th)); dti->dti_attr.la_valid = LA_MODE | LA_TYPE; dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; @@ -680,21 +812,7 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_trans, rc); - rc = dt_declare_insert(env, root, - (const struct dt_rec *)lu_object_fid(&o->do_lu), - (const struct dt_key *)dti->dti_buf, - th); - if (rc) - GOTO(out_trans, rc); - - dti->dti_lb.lb_buf = NULL; - dti->dti_lb.lb_len = sizeof(dti->dti_lma); - rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, - 0, th); - if (rc) - GOTO(out_trans, rc); - - rc = dt_declare_record_write(env, o, sizeof(losd), 0, th); + rc = dt_declare_record_write(env, o, sizeof(lastid), 0, th); if (rc) GOTO(out_trans, rc); @@ -702,56 +820,42 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_trans, rc); - LASSERT(!dt_object_exists(o)); - rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th); + dt_write_lock(env, o, 0); + if (dt_object_exists(o)) + GOTO(out_lock, rc = 0); + + rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, + th); if (rc) - GOTO(out_trans, rc); - LASSERT(dt_object_exists(o)); + GOTO(out_lock, rc); - losd.lso_magic = cpu_to_le32(LOS_MAGIC); - losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1); + lastid = cpu_to_le64(first_oid); dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th); if (rc) - GOTO(out_trans, rc); -#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0) -#error "fix this before release" -#endif - /* - * there is one technical debt left in Orion: - * proper hanlding of named vs no-name objects. - * Llog objects have name always as they are placed in O/d/... - */ - if (fid_seq(lu_object_fid(&o->do_lu)) != FID_SEQ_LLOG) { - rc = dt_insert(env, root, - (const struct dt_rec *)first_fid, - (const struct dt_key *)dti->dti_buf, - th, BYPASS_CAPA, 1); - if (rc) - GOTO(out_trans, rc); - } + GOTO(out_lock, rc); +out_lock: + dt_write_unlock(env, o); out_trans: dt_trans_stop(env, dev, th); } else { dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); + dt_read_lock(env, o, 0); rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); - if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { - CERROR("local storage file "DFID" is corrupted\n", - PFID(first_fid)); + dt_read_unlock(env, o); + if (rc == 0 && le64_to_cpu(lastid) > OBIF_MAX_OID) { + CERROR("%s: bad oid "LPU64" is read from LAST_ID\n", + o->do_lu.lo_dev->ld_obd->obd_name, + le64_to_cpu(lastid)); rc = -EINVAL; } } -out_lock: - dt_write_unlock(env, o); out_los: - if (root != NULL && !IS_ERR(root)) - lu_object_put_nocache(env, &root->do_lu); - if (rc != 0) { cfs_list_del(&(*los)->los_list); cfs_atomic_dec(&ls->ls_refcount); @@ -761,8 +865,14 @@ out_los: lu_object_put_nocache(env, &o->do_lu); } else { (*los)->los_seq = fid_seq(first_fid); - (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid); + (*los)->los_last_oid = le64_to_cpu(lastid); (*los)->los_obj = o; + /* Read value should not be less than initial one + * but possible after upgrade from older fs. + * In this case just switch to the first_oid in memory and + * it will be updated on disk with first object generated */ + if ((*los)->los_last_oid < first_oid) + (*los)->los_last_oid = first_oid; } out: mutex_unlock(&ls->ls_los_mutex);