X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Flocal_storage.c;h=05132b0f83dd1d6ecb6ec66dc28958101f1e91cd;hb=0c5ad4b6df5bf35b291842fc6d42c2720246a026;hp=7f5fd79f959b680bd6e4f77f6d9f9fdec4807821;hpb=8ef1b1f59948970b828d889ba8afabc2cc57e8cf;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/local_storage.c b/lustre/obdclass/local_storage.c index 7f5fd79..05132b0 100644 --- a/lustre/obdclass/local_storage.c +++ b/lustre/obdclass/local_storage.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/obdclass/local_storage.c @@ -35,7 +35,7 @@ #include "local_storage.h" /* all initialized local storages on this node are linked on this */ -static CFS_LIST_HEAD(ls_list_head); +static LIST_HEAD(ls_list_head); static DEFINE_MUTEX(ls_list_mutex); static int ls_object_init(const struct lu_env *env, struct lu_object *o, @@ -47,7 +47,7 @@ static int ls_object_init(const struct lu_env *env, struct lu_object *o, ENTRY; - ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev); + ls = container_of(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev); under = &ls->ls_osd->dd_lu_dev; below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); if (below == NULL) @@ -65,17 +65,18 @@ static void ls_object_free(const struct lu_env *env, struct lu_object *o) dt_object_fini(&obj->ls_obj); lu_object_header_fini(h); - OBD_FREE_PTR(obj); + OBD_FREE_PRE(obj, sizeof(*obj), "kfreed"); + kfree_rcu(obj, ls_header.loh_rcu); } -struct lu_object_operations ls_lu_obj_ops = { +static struct lu_object_operations ls_lu_obj_ops = { .loo_object_init = ls_object_init, .loo_object_free = ls_object_free, }; -struct lu_object *ls_object_alloc(const struct lu_env *env, - const struct lu_object_header *_h, - struct lu_device *d) +static struct lu_object *ls_object_alloc(const struct lu_env *env, + const struct lu_object_header *_h, + struct lu_device *d) { struct lu_object_header *h; struct ls_object *o; @@ -108,9 +109,9 @@ static struct ls_device *__ls_find_dev(struct dt_device *dev) { struct ls_device *ls, *ret = NULL; - cfs_list_for_each_entry(ls, &ls_list_head, ls_linkage) { + list_for_each_entry(ls, &ls_list_head, ls_linkage) { if (ls->ls_osd == dev) { - cfs_atomic_inc(&ls->ls_refcount); + atomic_inc(&ls->ls_refcount); ret = ls; break; } @@ -155,8 +156,8 @@ struct ls_device *ls_device_get(struct dt_device *dev) if (ls == NULL) GOTO(out_ls, ls = ERR_PTR(-ENOMEM)); - cfs_atomic_set(&ls->ls_refcount, 1); - CFS_INIT_LIST_HEAD(&ls->ls_los_list); + atomic_set(&ls->ls_refcount, 1); + INIT_LIST_HEAD(&ls->ls_los_list); mutex_init(&ls->ls_los_mutex); ls->ls_osd = dev; @@ -167,7 +168,7 @@ struct ls_device *ls_device_get(struct dt_device *dev) ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site; /* finally add ls to the list */ - cfs_list_add(&ls->ls_linkage, &ls_list_head); + list_add(&ls->ls_linkage, &ls_list_head); out_ls: mutex_unlock(&ls_list_mutex); RETURN(ls); @@ -176,13 +177,13 @@ out_ls: void ls_device_put(const struct lu_env *env, struct ls_device *ls) { LASSERT(env); - if (!cfs_atomic_dec_and_test(&ls->ls_refcount)) + if (!atomic_dec_and_test(&ls->ls_refcount)) return; mutex_lock(&ls_list_mutex); - if (cfs_atomic_read(&ls->ls_refcount) == 0) { - LASSERT(cfs_list_empty(&ls->ls_los_list)); - cfs_list_del(&ls->ls_linkage); + if (atomic_read(&ls->ls_refcount) == 0) { + LASSERT(list_empty(&ls->ls_los_list)); + list_del(&ls->ls_linkage); lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0); lu_device_fini(&ls->ls_top_dev.dd_lu_dev); OBD_FREE_PTR(ls); @@ -208,7 +209,7 @@ int local_object_fid_generate(const struct lu_env *env, mutex_lock(&los->los_id_lock); fid->f_seq = los->los_seq; - fid->f_oid = los->los_last_oid++; + fid->f_oid = ++los->los_last_oid; fid->f_ver = 0; mutex_unlock(&los->los_id_lock); @@ -229,8 +230,10 @@ int local_object_declare_create(const struct lu_env *env, /* update fid generation file */ if (los != NULL) { LASSERT(dt_object_exists(los->los_obj)); + dti->dti_lb.lb_buf = NULL; + dti->dti_lb.lb_len = sizeof(struct los_ondisk); rc = dt_declare_record_write(env, los->los_obj, - sizeof(struct los_ondisk), 0, th); + &dti->dti_lb, 0, th); if (rc) RETURN(rc); } @@ -252,7 +255,7 @@ int local_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { struct dt_thread_info *dti = dt_info(env); - struct los_ondisk losd; + u64 lastid; int rc; ENTRY; @@ -274,12 +277,11 @@ int local_object_create(const struct lu_env *env, /* update local oid number on disk so that * we know the last one used after reboot */ - losd.lso_magic = cpu_to_le32(LOS_MAGIC); - losd.lso_next_oid = cpu_to_le32(los->los_last_oid); + lastid = cpu_to_le64(los->los_last_oid); dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off, th); mutex_unlock(&los->los_id_lock); @@ -290,20 +292,28 @@ int local_object_create(const struct lu_env *env, /* * Create local named object (file, directory or index) in parent directory. */ -struct dt_object *__local_file_create(const struct lu_env *env, - const struct lu_fid *fid, - struct local_oid_storage *los, - struct ls_device *ls, - struct dt_object *parent, - const char *name, struct lu_attr *attr, - struct dt_object_format *dof) +static struct dt_object *__local_file_create(const struct lu_env *env, + const struct lu_fid *fid, + struct local_oid_storage *los, + struct ls_device *ls, + struct dt_object *parent, + const char *name, + struct lu_attr *attr, + struct dt_object_format *dof) { - struct dt_thread_info *dti = dt_info(env); + struct dt_thread_info *dti = dt_info(env); + struct lu_object_conf *conf = &dti->dti_conf; + struct dt_insert_rec *rec = &dti->dti_dt_rec; struct dt_object *dto; struct thandle *th; int rc; - dto = ls_locate(env, ls, fid); + /* We know that the target object does not exist, to be created, + * then give some hints - LOC_F_NEW to help low layer to handle + * that efficiently and properly. */ + memset(conf, 0, sizeof(*conf)); + conf->loc_flags = LOC_F_NEW; + dto = ls_locate(env, ls, fid, conf); if (unlikely(IS_ERR(dto))) RETURN(dto); @@ -320,19 +330,49 @@ struct dt_object *__local_file_create(const struct lu_env *env, GOTO(trans_stop, rc); if (dti->dti_dof.dof_type == DFT_DIR) { - dt_declare_ref_add(env, dto, th); - dt_declare_ref_add(env, parent, th); + rc = dt_declare_ref_add(env, dto, th); + if (rc < 0) + GOTO(trans_stop, rc); + + rc = dt_declare_ref_add(env, parent, th); + if (rc < 0) + GOTO(trans_stop, rc); } - rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th); + rec->rec_fid = fid; + rec->rec_type = attr->la_mode & S_IFMT; + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); if (rc) GOTO(trans_stop, rc); + if (dti->dti_dof.dof_type == DFT_DIR) { + if (!dt_try_as_dir(env, dto)) + GOTO(trans_stop, rc = -ENOTDIR); + + rec->rec_type = S_IFDIR; + rec->rec_fid = fid; + rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)".", th); + if (rc != 0) + GOTO(trans_stop, rc); + + rec->rec_fid = lu_object_fid(&parent->do_lu); + rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)"..", th); + if (rc != 0) + GOTO(trans_stop, rc); + + rc = dt_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(trans_stop, rc); + } + rc = dt_trans_start_local(env, ls->ls_osd, th); if (rc) GOTO(trans_stop, rc); - dt_write_lock(env, dto, 0); + dt_write_lock(env, dto, DT_SRC_CHILD); if (dt_object_exists(dto)) GOTO(unlock, rc = 0); @@ -344,23 +384,28 @@ struct dt_object *__local_file_create(const struct lu_env *env, LASSERT(dt_object_exists(dto)); if (dti->dti_dof.dof_type == DFT_DIR) { - if (!dt_try_as_dir(env, dto)) - GOTO(destroy, rc = -ENOTDIR); + + rec->rec_type = S_IFDIR; + rec->rec_fid = fid; /* Add "." and ".." for newly created dir */ - rc = dt_insert(env, dto, (void *)fid, (void *)".", th, - BYPASS_CAPA, 1); - if (rc) + rc = dt_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)".", th); + if (rc != 0) GOTO(destroy, rc); + dt_ref_add(env, dto, th); - rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu), - (void *)"..", th, BYPASS_CAPA, 1); - if (rc) + rec->rec_fid = lu_object_fid(&parent->do_lu); + rc = dt_insert(env, dto, (const struct dt_rec *)rec, + (const struct dt_key *)"..", th); + if (rc != 0) GOTO(destroy, rc); } - dt_write_lock(env, parent, 0); - rc = dt_insert(env, parent, (const struct dt_rec *)fid, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + rec->rec_fid = fid; + rec->rec_type = dto->do_lu.lo_header->loh_attr; + dt_write_lock(env, parent, DT_SRC_PARENT); + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); if (dti->dti_dof.dof_type == DFT_DIR) dt_ref_add(env, parent, th); dt_write_unlock(env, parent); @@ -375,12 +420,34 @@ trans_stop: dt_trans_stop(env, ls->ls_osd, th); out: if (rc) { - lu_object_put_nocache(env, &dto->do_lu); + dt_object_put_nocache(env, dto); dto = ERR_PTR(rc); } RETURN(dto); } +struct dt_object *local_file_find(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *parent, + const char *name) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *dto; + int rc; + + LASSERT(parent); + + rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); + if (!rc) + dto = ls_locate(env, dt2ls_dev(los->los_dev), + &dti->dti_fid, NULL); + else + dto = ERR_PTR(rc); + + return dto; +} +EXPORT_SYMBOL(local_file_find); + /* * Look up and create (if it does not exist) a local named file or directory in * parent directory. @@ -394,29 +461,21 @@ struct dt_object *local_file_find_or_create(const struct lu_env *env, struct dt_object *dto; int rc; - LASSERT(parent); + dto = local_file_find(env, los, parent, name); + if (!IS_ERR(dto) || PTR_ERR(dto) != -ENOENT) + return dto; - rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); - if (rc == 0) - /* name is found, get the object */ - dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid); - else if (rc != -ENOENT) - dto = ERR_PTR(rc); - else { - rc = local_object_fid_generate(env, los, &dti->dti_fid); - if (rc < 0) { - dto = ERR_PTR(rc); - } else { - /* create the object */ - dti->dti_attr.la_valid = LA_MODE; - dti->dti_attr.la_mode = mode; - dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT); - dto = __local_file_create(env, &dti->dti_fid, los, - dt2ls_dev(los->los_dev), - parent, name, &dti->dti_attr, - &dti->dti_dof); - } - } + rc = local_object_fid_generate(env, los, &dti->dti_fid); + if (rc) + return ERR_PTR(rc); + + /* create the object */ + dti->dti_attr.la_valid = LA_MODE; + dti->dti_attr.la_mode = mode; + dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT); + dto = __local_file_create(env, &dti->dti_fid, los, + dt2ls_dev(los->los_dev), parent, name, + &dti->dti_attr, &dti->dti_dof); return dto; } EXPORT_SYMBOL(local_file_find_or_create); @@ -444,7 +503,7 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, ls = ls_device_get(dt); if (IS_ERR(ls)) { - dto = ERR_PTR(PTR_ERR(ls)); + dto = ERR_CAST(ls); } else { /* create the object */ dti->dti_attr.la_valid = LA_MODE; @@ -457,7 +516,7 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, * have to open the object in other device stack */ if (!IS_ERR(dto)) { dti->dti_fid = dto->do_lu.lo_header->loh_fid; - lu_object_put_nocache(env, &dto->do_lu); + dt_object_put_nocache(env, dto); dto = dt_locate(env, dt, &dti->dti_fid); } ls_device_put(env, ls); @@ -486,7 +545,8 @@ struct dt_object *local_index_find_or_create(const struct lu_env *env, rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); if (rc == 0) { /* name is found, get the object */ - dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid); + dto = ls_locate(env, dt2ls_dev(los->los_dev), + &dti->dti_fid, NULL); } else if (rc != -ENOENT) { dto = ERR_PTR(rc); } else { @@ -538,7 +598,7 @@ local_index_find_or_create_with_fid(const struct lu_env *env, ls = ls_device_get(dt); if (IS_ERR(ls)) { - dto = ERR_PTR(PTR_ERR(ls)); + dto = ERR_CAST(ls); } else { /* create the object */ dti->dti_attr.la_valid = LA_MODE; @@ -552,7 +612,7 @@ local_index_find_or_create_with_fid(const struct lu_env *env, * have to open the object in other device stack */ if (!IS_ERR(dto)) { dti->dti_fid = dto->do_lu.lo_header->loh_fid; - lu_object_put_nocache(env, &dto->do_lu); + dt_object_put_nocache(env, dto); dto = dt_locate(env, dt, &dti->dti_fid); } ls_device_put(env, ls); @@ -614,15 +674,18 @@ int local_object_unlink(const struct lu_env *env, struct dt_device *dt, GOTO(stop, rc); dt_write_lock(env, dto, 0); - rc = dt_delete(env, parent, (struct dt_key *)name, th, BYPASS_CAPA); + rc = dt_delete(env, parent, (struct dt_key *)name, th); if (rc < 0) GOTO(unlock, rc); rc = dt_ref_del(env, dto, th); if (rc < 0) { - rc = dt_insert(env, parent, - (const struct dt_rec *)&dti->dti_fid, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + struct dt_insert_rec *rec = &dti->dti_dt_rec; + + rec->rec_fid = &dti->dti_fid; + rec->rec_type = dto->do_lu.lo_header->loh_attr; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)name, th); GOTO(unlock, rc); } @@ -632,7 +695,7 @@ unlock: stop: dt_trans_stop(env, dt, th); out: - lu_object_put_nocache(env, &dto->do_lu); + dt_object_put_nocache(env, dto); return rc; } EXPORT_SYMBOL(local_object_unlink); @@ -641,9 +704,9 @@ struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) { struct local_oid_storage *los, *ret = NULL; - cfs_list_for_each_entry(los, &ls->ls_los_list, los_list) { + list_for_each_entry(los, &ls->ls_los_list, los_list) { if (los->los_seq == seq) { - cfs_atomic_inc(&los->los_refcount); + atomic_inc(&los->los_refcount); ret = los; break; } @@ -653,11 +716,84 @@ struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) void dt_los_put(struct local_oid_storage *los) { - if (cfs_atomic_dec_and_test(&los->los_refcount)) + if (atomic_dec_and_test(&los->los_refcount)) /* should never happen, only local_oid_storage_fini should * drop refcount to zero */ LBUG(); - return; +} + +/* after Lustre 2.3 release there may be old file to store last generated FID + * If such file exists then we have to read its content + */ +static int lastid_compat_check(const struct lu_env *env, struct dt_device *dev, + __u64 lastid_seq, __u32 *first_oid, + struct ls_device *ls) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *root = NULL; + struct los_ondisk losd; + struct dt_object *o = NULL; + int rc = 0; + + rc = dt_root_get(env, dev, &dti->dti_fid); + if (rc) + return rc; + + root = ls_locate(env, ls, &dti->dti_fid, NULL); + if (IS_ERR(root)) + return PTR_ERR(root); + + /* find old last_id file */ + snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%#llx-lastid", + lastid_seq); + rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid); + dt_object_put_nocache(env, root); + if (rc == -ENOENT) { + /* old llog lastid accessed by FID only */ + if (lastid_seq != FID_SEQ_LLOG) + return 0; + dti->dti_fid.f_seq = FID_SEQ_LLOG; + dti->dti_fid.f_oid = 1; + dti->dti_fid.f_ver = 0; + o = ls_locate(env, ls, &dti->dti_fid, NULL); + if (IS_ERR(o)) + return PTR_ERR(o); + + if (!dt_object_exists(o)) { + dt_object_put_nocache(env, o); + return 0; + } + CDEBUG(D_INFO, "Found old llog lastid file\n"); + } else if (rc < 0) { + return rc; + } else { + CDEBUG(D_INFO, "Found old lastid file for sequence %#llx\n", + lastid_seq); + o = ls_locate(env, ls, &dti->dti_fid, NULL); + if (IS_ERR(o)) + return PTR_ERR(o); + } + /* let's read seq-NNNNNN-lastid file value */ + LASSERT(dt_object_exists(o)); + dti->dti_off = 0; + dti->dti_lb.lb_buf = &losd; + dti->dti_lb.lb_len = sizeof(losd); + dt_read_lock(env, o, 0); + rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); + dt_read_unlock(env, o); + if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { + CERROR("%s: wrong content of seq-%#llx-lastid file, magic %x\n", + o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, + le32_to_cpu(losd.lso_magic)); + rc = -EINVAL; + } else if (rc < 0) { + CERROR("%s: failed to read seq-%#llx-lastid: rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, rc); + } + dt_object_put_nocache(env, o); + if (rc == 0) + *first_oid = le32_to_cpu(losd.lso_next_oid); + return rc; } /** @@ -683,11 +819,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, { struct dt_thread_info *dti = dt_info(env); struct ls_device *ls; - struct los_ondisk losd; - struct dt_object *root = NULL; + u64 lastid; struct dt_object *o = NULL; struct thandle *th; - int rc; + __u32 first_oid = fid_oid(first_fid); + int rc = 0; ENTRY; @@ -705,36 +841,27 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (*los == NULL) GOTO(out, rc = -ENOMEM); - cfs_atomic_set(&(*los)->los_refcount, 1); + atomic_set(&(*los)->los_refcount, 1); mutex_init(&(*los)->los_id_lock); (*los)->los_dev = &ls->ls_top_dev; - cfs_atomic_inc(&ls->ls_refcount); - cfs_list_add(&(*los)->los_list, &ls->ls_los_list); - - rc = dt_root_get(env, dev, &dti->dti_fid); - if (rc) - GOTO(out_los, rc); - - root = ls_locate(env, ls, &dti->dti_fid); - if (IS_ERR(root)) - GOTO(out_los, rc = PTR_ERR(root)); - - /* initialize data allowing to generate new fids, - * literally we need a sequence */ - snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid", - fid_seq(first_fid)); - rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid); - if (rc == -ENOENT) - dti->dti_fid = *first_fid; - else if (rc < 0) - GOTO(out_los, rc); - - o = ls_locate(env, ls, &dti->dti_fid); + atomic_inc(&ls->ls_refcount); + list_add(&(*los)->los_list, &ls->ls_los_list); + + /* Use {seq, 0, 0} to create the LAST_ID file for every + * sequence. OIDs start at LUSTRE_FID_INIT_OID. + */ + dti->dti_fid.f_seq = fid_seq(first_fid); + dti->dti_fid.f_oid = LUSTRE_FID_LASTID_OID; + dti->dti_fid.f_ver = 0; + o = ls_locate(env, ls, &dti->dti_fid, NULL); if (IS_ERR(o)) GOTO(out_los, rc = PTR_ERR(o)); - LASSERT(fid_seq(&dti->dti_fid) == fid_seq(first_fid)); + if (!dt_object_exists(o)) { - LASSERT(rc == -ENOENT); + rc = lastid_compat_check(env, dev, fid_seq(first_fid), + &first_oid, ls); + if (rc < 0) + GOTO(out_los, rc); th = dt_trans_create(env, dev); if (IS_ERR(th)) @@ -749,14 +876,13 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_trans, rc); - rc = dt_declare_insert(env, root, - (const struct dt_rec *)&dti->dti_fid, - (const struct dt_key *)dti->dti_buf, - th); - if (rc) - GOTO(out_trans, rc); + lastid = cpu_to_le64(first_oid); - rc = dt_declare_record_write(env, o, sizeof(losd), 0, th); + dti->dti_off = 0; + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); + rc = dt_declare_record_write(env, o, &dti->dti_lb, dti->dti_off, + th); if (rc) GOTO(out_trans, rc); @@ -764,7 +890,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_trans, rc); - dt_write_lock(env, root, 0); dt_write_lock(env, o, 0); if (dt_object_exists(o)) GOTO(out_lock, rc = 0); @@ -774,54 +899,45 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_lock, rc); - losd.lso_magic = cpu_to_le32(LOS_MAGIC); - losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1); - - dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th); if (rc) GOTO(out_lock, rc); - rc = dt_insert(env, root, - (const struct dt_rec *)&dti->dti_fid, - (const struct dt_key *)dti->dti_buf, - th, BYPASS_CAPA, 1); - if (rc) - GOTO(out_lock, rc); out_lock: dt_write_unlock(env, o); - dt_write_unlock(env, root); out_trans: dt_trans_stop(env, dev, th); } else { dti->dti_off = 0; - dti->dti_lb.lb_buf = &losd; - dti->dti_lb.lb_len = sizeof(losd); + dti->dti_lb.lb_buf = &lastid; + dti->dti_lb.lb_len = sizeof(lastid); dt_read_lock(env, o, 0); rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); dt_read_unlock(env, o); - if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { - CERROR("local storage file "DFID" is corrupted\n", - PFID(first_fid)); + if (rc == 0 && le64_to_cpu(lastid) > OBIF_MAX_OID) { + CERROR("%s: bad oid %llu is read from LAST_ID\n", + o->do_lu.lo_dev->ld_obd->obd_name, + le64_to_cpu(lastid)); rc = -EINVAL; } } out_los: - if (root != NULL && !IS_ERR(root)) - lu_object_put_nocache(env, &root->do_lu); - if (rc != 0) { - cfs_list_del(&(*los)->los_list); - cfs_atomic_dec(&ls->ls_refcount); + list_del(&(*los)->los_list); + atomic_dec(&ls->ls_refcount); OBD_FREE_PTR(*los); *los = NULL; if (o != NULL && !IS_ERR(o)) - lu_object_put_nocache(env, &o->do_lu); + dt_object_put_nocache(env, o); } else { (*los)->los_seq = fid_seq(first_fid); - (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid); + (*los)->los_last_oid = le64_to_cpu(lastid); (*los)->los_obj = o; + /* Read value should not be less than initial one + * but possible after upgrade from older fs. + * In this case just switch to the first_oid in memory and + * it will be updated on disk with first object generated */ + if ((*los)->los_last_oid < first_oid) + (*los)->los_last_oid = first_oid; } out: mutex_unlock(&ls->ls_los_mutex); @@ -831,24 +947,26 @@ out: EXPORT_SYMBOL(local_oid_storage_init); void local_oid_storage_fini(const struct lu_env *env, - struct local_oid_storage *los) + struct local_oid_storage *los) { struct ls_device *ls; - if (!cfs_atomic_dec_and_test(&los->los_refcount)) - return; - LASSERT(env); LASSERT(los->los_dev); ls = dt2ls_dev(los->los_dev); + /* Take the mutex before decreasing the reference to avoid race + * conditions as described in LU-4721. */ mutex_lock(&ls->ls_los_mutex); - if (cfs_atomic_read(&los->los_refcount) == 0) { - if (los->los_obj) - lu_object_put_nocache(env, &los->los_obj->do_lu); - cfs_list_del(&los->los_list); - OBD_FREE_PTR(los); + if (!atomic_dec_and_test(&los->los_refcount)) { + mutex_unlock(&ls->ls_los_mutex); + return; } + + if (los->los_obj) + dt_object_put_nocache(env, los->los_obj); + list_del(&los->los_list); + OBD_FREE_PTR(los); mutex_unlock(&ls->ls_los_mutex); ls_device_put(env, ls); }