From: Alex Zhuravlev Date: Fri, 6 Jan 2012 05:01:40 +0000 (+0800) Subject: LU-909 osd: changes to osd api X-Git-Tag: 2.1.54~14 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=22464d1230ed58461f51d881f512d5e16644a735 LU-909 osd: changes to osd api the main purpose of the patch is to get declare methods in the API and to teach osd-based devices to use that - new declaration methods for each changing method - explicit destroy method: ->do_ref_del() never destroy object - methods to access data in 0-copy manner: no actual implementation in this patch - mdd/fld use new methods to create/declare/start transactions - specific method to change/access version are removed: use xattr methods - ldiskfs osd tracks all declarations and asserts if caller is trying to call changing method w/o proper declaration Change-Id: I473c0c2950c1920abb2fef1dac465c08f35522ea Signed-off-by: Alex Zhuravlev Signed-off-by: Hongchao Zhang Reviewed-on: http://review.whamcloud.com/1669 Tested-by: Hudson Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin Tested-by: Maloo --- diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index 8bd7f6b..4fea8cc 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -510,7 +510,10 @@ AC_TRY_RUN([ AC_MSG_RESULT([ACL size $acl_size]) AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE, AS_TR_SH([$acl_size]), [size of xattr acl]) ],[ - AC_ERROR([ACL size can't computed]) + AC_ERROR([ACL size can't be computed]) +],[ + AC_MSG_RESULT([can't check ACL size, make it 260]) + AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE,260) ]) CFLAGS="$tmp_flags" ]) diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index df839c9..01e9244 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -411,18 +411,6 @@ static int cml_object_sync(const struct lu_env *env, struct md_object *mo) RETURN(rc); } -static dt_obj_version_t cml_version_get(const struct lu_env *env, - struct md_object *mo) -{ - return mo_version_get(env, md_object_next(mo)); -} - -static void cml_version_set(const struct lu_env *env, struct md_object *mo, - dt_obj_version_t version) -{ - return mo_version_set(env, md_object_next(mo), version); -} - static const struct md_object_operations cml_mo_ops = { .moo_permission = cml_permission, .moo_attr_get = cml_attr_get, @@ -441,8 +429,6 @@ static const struct md_object_operations cml_mo_ops = { .moo_changelog = cml_changelog, .moo_capa_get = cml_capa_get, .moo_object_sync = cml_object_sync, - .moo_version_get = cml_version_get, - .moo_version_set = cml_version_set, .moo_path = cml_path, .moo_file_lock = cml_file_lock, .moo_file_unlock = cml_file_unlock, @@ -1140,28 +1126,6 @@ static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo, return -EREMOTE; } -/** - * cmr moo_version_get(). - */ -static dt_obj_version_t cmr_version_get(const struct lu_env *env, - struct md_object *mo) -{ - /** Don't check remote object version */ - return 0; -} - - -/** - * cmr moo_version_set(). - * No need to update remote object version here, it is done as a part - * of reintegration of partial operation on the remote server. - */ -static void cmr_version_set(const struct lu_env *env, struct md_object *mo, - dt_obj_version_t version) -{ - return; -} - /** Set of md_object_operations for cmr. */ static const struct md_object_operations cmr_mo_ops = { .moo_permission = cmr_permission, @@ -1181,8 +1145,6 @@ static const struct md_object_operations cmr_mo_ops = { .moo_changelog = cmr_changelog, .moo_capa_get = cmr_capa_get, .moo_object_sync = cmr_object_sync, - .moo_version_get = cmr_version_get, - .moo_version_set = cmr_version_set, .moo_path = cmr_path, .moo_file_lock = cmr_file_lock, .moo_file_unlock = cmr_file_unlock, diff --git a/lustre/fid/fid_internal.h b/lustre/fid/fid_internal.h index 96fb114..7908db8 100644 --- a/lustre/fid/fid_internal.h +++ b/lustre/fid/fid_internal.h @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -48,7 +49,6 @@ #ifdef __KERNEL__ struct seq_thread_info { struct req_capsule *sti_pill; - struct txn_param sti_txn; struct lu_seq_range sti_space; struct lu_buf sti_buf; }; diff --git a/lustre/fid/fid_store.c b/lustre/fid/fid_store.c index c09cf0cf..71e4f4d 100644 --- a/lustre/fid/fid_store.c +++ b/lustre/fid/fid_store.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -88,6 +89,26 @@ void seq_update_cb(struct lu_env *env, struct thandle *th, OBD_FREE_PTR(ccb); } +struct thandle *seq_store_trans_create(struct lu_server_seq *seq, + const struct lu_env *env) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); + return dt_trans_create(env, dt_dev); +} + +int seq_store_trans_start(struct lu_server_seq *seq, const struct lu_env *env, + struct thandle *th) +{ + struct dt_device *dt_dev; + ENTRY; + + dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); + + return dt_trans_start(env, dt_dev, th); +} + int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq) { struct seq_update_callback *ccb; @@ -106,6 +127,20 @@ int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq) return rc; } +int seq_declare_store_write(struct lu_server_seq *seq, + const struct lu_env *env, + struct thandle *th) +{ + struct dt_object *dt_obj = seq->lss_obj; + int rc; + ENTRY; + + rc = dt_obj->do_body_ops->dbo_declare_write(env, dt_obj, + sizeof(struct lu_seq_range), + 0, th); + return rc; +} + /* This function implies that caller takes care about locking. */ int seq_store_write(struct lu_server_seq *seq, const struct lu_env *env, @@ -141,36 +176,44 @@ int seq_store_write(struct lu_server_seq *seq, int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq, struct lu_seq_range *out, int sync) { - struct seq_thread_info *info; struct dt_device *dt_dev; struct thandle *th; int rc; - int credits = SEQ_TXN_STORE_CREDITS; ENTRY; dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); - info = lu_context_key_get(&env->le_ctx, &seq_thread_key); - if (out != NULL) - credits += FLD_TXN_INDEX_INSERT_CREDITS; - - txn_param_init(&info->sti_txn, credits); - th = dt_trans_start(env, dt_dev, &info->sti_txn); + th = seq_store_trans_create(seq, env); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = seq_declare_store_write(seq, env, th); + if (rc) + GOTO(exit, rc); + + if (out != NULL) { + rc = fld_declare_server_create(seq->lss_site->ms_server_fld, + env, th); + if (rc) + GOTO(exit, rc); + } + + rc = seq_store_trans_start(seq, env, th); + if (rc) + GOTO(exit, rc); + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); - GOTO(out,rc); + GOTO(exit,rc); } else if (out != NULL) { rc = fld_server_create(seq->lss_site->ms_server_fld, env, out, th); if (rc) { CERROR("%s: Can't Update fld database, rc %d\n", seq->lss_name, rc); - GOTO(out,rc); + GOTO(exit,rc); } } @@ -181,7 +224,7 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq, sync = !!seq_update_cb_add(th, seq); th->th_sync |= sync; -out: +exit: dt_trans_stop(env, dt_dev, th); return rc; } diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index ae9520a..872358c 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -111,6 +112,30 @@ static void __exit fld_mod_exit(void) } } +int fld_declare_server_create(struct lu_server_fld *fld, + const struct lu_env *env, + struct thandle *th) +{ + struct dt_object *dt_obj = fld->lsf_obj; + int rc; + + ENTRY; + + /* for ldiskfs OSD it's enough to declare operation with any ops + * with DMU we'll probably need to specify exact key/value */ + rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th); + if (rc) + GOTO(out, rc); + rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th); + if (rc) + GOTO(out, rc); + rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj, + NULL, NULL, th); +out: + RETURN(rc); +} +EXPORT_SYMBOL(fld_declare_server_create); + /** * Insert FLD index entry and update FLD cache. * @@ -245,7 +270,6 @@ out: RETURN(rc); } - EXPORT_SYMBOL(fld_server_create); /** diff --git a/lustre/fld/fld_index.c b/lustre/fld/fld_index.c index 997e781..a3277db 100644 --- a/lustre/fld/fld_index.c +++ b/lustre/fld/fld_index.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -112,19 +113,24 @@ static struct dt_rec *fld_rec(const struct lu_env *env, RETURN((void *)rec); } -struct thandle* fld_trans_start(struct lu_server_fld *fld, - const struct lu_env *env, int credit) +struct thandle *fld_trans_create(struct lu_server_fld *fld, + const struct lu_env *env) { - struct fld_thread_info *info; struct dt_device *dt_dev; - struct txn_param *p; dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); - info = lu_context_key_get(&env->le_ctx, &fld_thread_key); - p = &info->fti_txn_param; - txn_param_init(p, credit); - return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p); + return dt_dev->dd_ops->dt_trans_create(env, dt_dev); +} + +int fld_trans_start(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle *th) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + + return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th); } void fld_trans_stop(struct lu_server_fld *fld, @@ -136,6 +142,26 @@ void fld_trans_stop(struct lu_server_fld *fld, dt_dev->dd_ops->dt_trans_stop(env, th); } +int fld_declare_index_create(struct lu_server_fld *fld, + const struct lu_env *env, + const struct lu_seq_range *range, + struct thandle *th) +{ + struct dt_object *dt_obj = fld->lsf_obj; + seqno_t start; + int rc; + + ENTRY; + + start = range->lsr_start; + LASSERT(range_is_sane(range)); + + rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj, + fld_rec(env, range), + fld_key(env, start), th); + RETURN(rc); +} + /** * insert range in fld store. * @@ -192,9 +218,8 @@ int fld_index_delete(struct lu_server_fld *fld, ENTRY; - rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, - fld_key(env, seq), th, - BYPASS_CAPA); + rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq), + th, BYPASS_CAPA); CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n", fld->lsf_name, PRANGE(range), rc); @@ -254,11 +279,22 @@ static int fld_insert_igif_fld(struct lu_server_fld *fld, { struct thandle *th; int rc; - ENTRY; - th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS); + + /* FLD_TXN_INDEX_INSERT_CREDITS */ + th = fld_trans_create(fld, env); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th); + if (rc) { + fld_trans_stop(fld, env, th); + RETURN(rc); + } + rc = fld_trans_start(fld, env, th); + if (rc) { + fld_trans_stop(fld, env, th); + RETURN(rc); + } rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th); fld_trans_stop(fld, env, th); diff --git a/lustre/fld/fld_internal.h b/lustre/fld/fld_internal.h index 034d987..6179b3e 100644 --- a/lustre/fld/fld_internal.h +++ b/lustre/fld/fld_internal.h @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -146,12 +147,13 @@ struct fld_thread_info { struct lu_seq_range fti_rec; struct lu_seq_range fti_lrange; struct lu_seq_range fti_irange; - struct txn_param fti_txn_param; }; -struct thandle* fld_trans_start(struct lu_server_fld *fld, - const struct lu_env *env, int credit); +struct thandle *fld_trans_create(struct lu_server_fld *fld, + const struct lu_env *env); +int fld_trans_start(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle *th); void fld_trans_stop(struct lu_server_fld *fld, const struct lu_env *env, struct thandle* th); diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index db232da..d7db320 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -65,11 +65,12 @@ struct proc_dir_entry; struct lustre_cfg; struct thandle; -struct txn_param; struct dt_device; struct dt_object; struct dt_index_features; struct dt_quota_ctxt; +struct niobuf_local; +struct niobuf_remote; typedef enum { MNTOPT_USERXATTR = 0x00000001, @@ -82,6 +83,16 @@ struct dt_device_param { unsigned ddp_block_shift; mntopt_t ddp_mntopts; unsigned ddp_max_ea_size; + void *ddp_mnt; /* XXX: old code can retrieve mnt -bzzz */ + int ddp_mount_type; + unsigned long long ddp_maxbytes; + /* percentage of available space to reserve for grant error margin */ + int ddp_grant_reserved; + /* per-inode space consumption */ + short ddp_inodespace; + /* per-fragment grant overhead to be used by client for grant + * calculation */ + int ddp_grant_frag; }; /** @@ -100,25 +111,6 @@ struct dt_txn_commit_cb { }; /** - * Basic transaction credit op - */ -enum dt_txn_op { - DTO_INDEX_INSERT, - DTO_INDEX_DELETE, - DTO_IDNEX_UPDATE, - DTO_OBJECT_CREATE, - DTO_OBJECT_DELETE, - DTO_ATTR_SET_BASE, - DTO_XATTR_SET, - DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */ - DTO_WRITE_BASE, - DTO_WRITE_BLOCK, - DTO_ATTR_SET_CHOWN, - - DTO_NR -}; - -/** * Operations on dt device. */ struct dt_device_operations { @@ -126,17 +118,21 @@ struct dt_device_operations { * Return device-wide statistics. */ int (*dt_statfs)(const struct lu_env *env, - struct dt_device *dev, cfs_kstatfs_t *sfs); + struct dt_device *dev, cfs_kstatfs_t *osfs); + /** + * Create transaction, described by \a param. + */ + struct thandle *(*dt_trans_create)(const struct lu_env *env, + struct dt_device *dev); /** * Start transaction, described by \a param. */ - struct thandle *(*dt_trans_start)(const struct lu_env *env, - struct dt_device *dev, - struct txn_param *param); + int (*dt_trans_start)(const struct lu_env *env, + struct dt_device *dev, struct thandle *th); /** * Finish previously started transaction. */ - void (*dt_trans_stop)(const struct lu_env *env, + int (*dt_trans_stop)(const struct lu_env *env, struct thandle *th); /** * Add commit callback to the transaction. @@ -182,12 +178,6 @@ struct dt_device_operations { void (*dt_init_quota_ctxt)(const struct lu_env *env, struct dt_device *dev, struct dt_quota_ctxt *ctxt, void *data); - - /** - * get transaction credits for given \a op. - */ - int (*dt_credit_get)(const struct lu_env *env, struct dt_device *dev, - enum dt_txn_op); }; struct dt_index_features { @@ -272,7 +262,6 @@ struct dt_object_format { enum dt_format_type dt_mode_to_dft(__u32 mode); -/** Version type. May differ in DMU and ldiskfs */ typedef __u64 dt_obj_version_t; /** @@ -312,6 +301,10 @@ struct dt_object_operations { * * precondition: dt_object_exists(dt); */ + int (*do_declare_attr_set)(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *handle); int (*do_attr_set)(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, @@ -332,6 +325,11 @@ struct dt_object_operations { * * precondition: dt_object_exists(dt); */ + int (*do_declare_xattr_set)(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *handle); int (*do_xattr_set)(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle, @@ -341,6 +339,9 @@ struct dt_object_operations { * * precondition: dt_object_exists(dt); */ + int (*do_declare_xattr_del)(const struct lu_env *env, + struct dt_object *dt, + const char *name, struct thandle *handle); int (*do_xattr_del)(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *handle, @@ -371,6 +372,12 @@ struct dt_object_operations { * precondition: !dt_object_exists(dt); * postcondition: ergo(result == 0, dt_object_exists(dt)); */ + int (*do_declare_create)(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th); int (*do_create)(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -378,6 +385,17 @@ struct dt_object_operations { struct thandle *th); /** + Destroy object on this device + * precondition: !dt_object_exists(dt); + * postcondition: ergo(result == 0, dt_object_exists(dt)); + */ + int (*do_declare_destroy)(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th); + int (*do_destroy)(const struct lu_env *env, struct dt_object *dt, + struct thandle *th); + + /** * Announce that this object is going to be used as an index. This * operation check that object supports indexing operations and * installs appropriate dt_index_operations vector on success. @@ -392,13 +410,17 @@ struct dt_object_operations { * Add nlink of the object * precondition: dt_object_exists(dt); */ - void (*do_ref_add)(const struct lu_env *env, + int (*do_declare_ref_add)(const struct lu_env *env, + struct dt_object *dt, struct thandle *th); + int (*do_ref_add)(const struct lu_env *env, struct dt_object *dt, struct thandle *th); /** * Del nlink of the object * precondition: dt_object_exists(dt); */ - void (*do_ref_del)(const struct lu_env *env, + int (*do_declare_ref_del)(const struct lu_env *env, + struct dt_object *dt, struct thandle *th); + int (*do_ref_del)(const struct lu_env *env, struct dt_object *dt, struct thandle *th); struct obd_capa *(*do_capa_get)(const struct lu_env *env, @@ -406,10 +428,6 @@ struct dt_object_operations { struct lustre_capa *old, __u64 opc); int (*do_object_sync)(const struct lu_env *, struct dt_object *); - dt_obj_version_t (*do_version_get)(const struct lu_env *env, - struct dt_object *dt); - void (*do_version_set)(const struct lu_env *env, struct dt_object *dt, - dt_obj_version_t new_version); /** * Get object info of next level. Currently, only get inode from osd. * This is only used by quota b=16542 @@ -432,10 +450,66 @@ struct dt_body_operations { /** * precondition: dt_object_exists(dt); */ + ssize_t (*dbo_declare_write)(const struct lu_env *env, + struct dt_object *dt, + const loff_t size, loff_t pos, + struct thandle *handle); ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *handle, struct lustre_capa *capa, int ignore_quota); + /* + * methods for zero-copy IO + */ + + /* + * precondition: dt_object_exists(dt); + * returns: + * < 0 - error code + * = 0 - illegal + * > 0 - number of local buffers prepared + */ + int (*dbo_bufs_get)(const struct lu_env *env, struct dt_object *dt, + loff_t pos, ssize_t len, struct niobuf_local *lb, + int rw, struct lustre_capa *capa); + /* + * precondition: dt_object_exists(dt); + */ + int (*dbo_bufs_put)(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int nr); + /* + * precondition: dt_object_exists(dt); + */ + int (*dbo_write_prep)(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lb, int nr); + /* + * precondition: dt_object_exists(dt); + */ + int (*dbo_declare_write_commit)(const struct lu_env *env, + struct dt_object *dt, + struct niobuf_local *, + int, struct thandle *); + /* + * precondition: dt_object_exists(dt); + */ + int (*dbo_write_commit)(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *, int, struct thandle *); + /* + * precondition: dt_object_exists(dt); + */ + int (*dbo_read_prep)(const struct lu_env *env, struct dt_object *dt, + struct niobuf_local *lnb, int nr); + int (*dbo_fiemap_get)(const struct lu_env *env, struct dt_object *dt, + struct ll_user_fiemap *fm); + /** + * Punch object's content + * precondition: regular object, not index + */ + int (*do_declare_punch)(const struct lu_env *, struct dt_object *, + __u64, __u64, struct thandle *th); + int (*do_punch)(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th, + struct lustre_capa *capa); }; /** @@ -466,6 +540,11 @@ struct dt_index_operations { /** * precondition: dt_object_exists(dt); */ + int (*dio_declare_insert)(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle); int (*dio_insert)(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, struct thandle *handle, struct lustre_capa *capa, @@ -473,6 +552,10 @@ struct dt_index_operations { /** * precondition: dt_object_exists(dt); */ + int (*dio_declare_delete)(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle); int (*dio_delete)(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *handle, struct lustre_capa *capa); @@ -504,12 +587,14 @@ struct dt_index_operations { const struct dt_it *di); int (*rec)(const struct lu_env *env, const struct dt_it *di, - struct lu_dirent *lde, + struct dt_rec *rec, __u32 attr); __u64 (*store)(const struct lu_env *env, const struct dt_it *di); int (*load)(const struct lu_env *env, const struct dt_it *di, __u64 hash); + int (*key_rec)(const struct lu_env *env, + const struct dt_it *di, void* key_rec); } dio_it; }; @@ -546,6 +631,12 @@ struct dt_object { const struct dt_index_operations *do_index_ops; }; +static inline struct dt_object *lu2dt(struct lu_object *l) +{ + LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev)); + return container_of0(l, struct dt_object, do_lu); +} + int dt_object_init(struct dt_object *obj, struct lu_object_header *h, struct lu_device *d); @@ -556,23 +647,6 @@ static inline int dt_object_exists(const struct dt_object *dt) return lu_object_exists(&dt->do_lu); } -struct txn_param { - /** number of blocks this transaction will modify */ - unsigned int tp_credits; -}; - -static inline void txn_param_init(struct txn_param *p, unsigned int credits) -{ - memset(p, 0, sizeof(*p)); - p->tp_credits = credits; -} - -static inline void txn_param_credit_add(struct txn_param *p, - unsigned int credits) -{ - p->tp_credits += credits; -} - /** * This is the general purpose transaction handle. * 1. Transaction Life Cycle @@ -591,14 +665,21 @@ struct thandle { /** the dt device on which the transactions are executed */ struct dt_device *th_dev; + /** additional tags (layers can add in declare) */ + __u32 th_tags; + /** context for this transaction, tag is LCT_TX_HANDLE */ struct lu_context th_ctx; /** the last operation result in this transaction. * this value is used in recovery */ __s32 th_result; + /** whether we need sync commit */ - int th_sync; + int th_sync:1; + + /* local transation, no need to inform other layers */ + int th_local:1; }; /** @@ -614,7 +695,7 @@ struct thandle { */ struct dt_txn_callback { int (*dtc_txn_start)(const struct lu_env *env, - struct txn_param *param, void *cookie); + struct thandle *txn, void *cookie); int (*dtc_txn_stop)(const struct lu_env *env, struct thandle *txn, void *cookie); void (*dtc_txn_commit)(struct thandle *txn, void *cookie); @@ -627,7 +708,7 @@ void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb); void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb); int dt_txn_hook_start(const struct lu_env *env, - struct dt_device *dev, struct txn_param *param); + struct dt_device *dev, struct thandle *txn); int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn); void dt_txn_hook_commit(struct thandle *txn); @@ -657,19 +738,12 @@ struct dt_object *dt_locate(const struct lu_env *env, struct dt_device *dev, const struct lu_fid *fid); -static inline dt_obj_version_t do_version_get(const struct lu_env *env, - struct dt_object *o) -{ - LASSERT(o->do_ops->do_version_get); - return o->do_ops->do_version_get(env, o); -} +int dt_declare_version_set(const struct lu_env *env, struct dt_object *o, + struct thandle *th); +void dt_version_set(const struct lu_env *env, struct dt_object *o, + dt_obj_version_t version, struct thandle *th); +dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o); -static inline void do_version_set(const struct lu_env *env, - struct dt_object *o, dt_obj_version_t v) -{ - LASSERT(o->do_ops->do_version_set); - return o->do_ops->do_version_set(env, o, v); -} int dt_record_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos); @@ -677,17 +751,32 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th); -static inline struct thandle *dt_trans_start(const struct lu_env *env, - struct dt_device *d, - struct txn_param *p) +static inline struct thandle *dt_trans_create(const struct lu_env *env, + struct dt_device *d) { - LASSERT(d->dd_ops->dt_trans_start); - return d->dd_ops->dt_trans_start(env, d, p); + LASSERT(d->dd_ops->dt_trans_create); + return d->dd_ops->dt_trans_create(env, d); } -static inline void dt_trans_stop(const struct lu_env *env, +static inline int dt_trans_start(const struct lu_env *env, struct dt_device *d, struct thandle *th) { + LASSERT(d->dd_ops->dt_trans_start); + return d->dd_ops->dt_trans_start(env, d, th); +} + +/* for this transaction hooks shouldn't be called */ +static inline int dt_trans_start_local(const struct lu_env *env, + struct dt_device *d, struct thandle *th) +{ + LASSERT(d->dd_ops->dt_trans_start); + th->th_local = 1; + return d->dd_ops->dt_trans_start(env, d, th); +} + +static inline int dt_trans_stop(const struct lu_env *env, + struct dt_device *d, struct thandle *th) +{ LASSERT(d->dd_ops->dt_trans_stop); return d->dd_ops->dt_trans_stop(env, th); } @@ -699,4 +788,465 @@ static inline int dt_trans_cb_add(struct thandle *th, return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb); } /** @} dt */ + + +static inline int dt_declare_record_write(const struct lu_env *env, + struct dt_object *dt, + int size, loff_t pos, + struct thandle *th) +{ + int rc; + + LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); + LASSERT(th != NULL); + rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th); + return rc; +} + +static inline int dt_declare_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_create); + return dt->do_ops->do_declare_create(env, dt, attr, hint, dof, th); +} + +static inline int dt_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_create); + return dt->do_ops->do_create(env, dt, attr, hint, dof, th); +} + +static inline int dt_declare_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_destroy); + return dt->do_ops->do_declare_destroy(env, dt, th); +} + +static inline int dt_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_destroy); + return dt->do_ops->do_destroy(env, dt, th); +} + +static inline void dt_read_lock(const struct lu_env *env, + struct dt_object *dt, + unsigned role) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_read_lock); + dt->do_ops->do_read_lock(env, dt, role); +} + +static inline void dt_write_lock(const struct lu_env *env, + struct dt_object *dt, + unsigned role) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_write_lock); + dt->do_ops->do_write_lock(env, dt, role); +} + +static inline void dt_read_unlock(const struct lu_env *env, + struct dt_object *dt) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_read_unlock); + dt->do_ops->do_read_unlock(env, dt); +} + +static inline void dt_write_unlock(const struct lu_env *env, + struct dt_object *dt) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_write_unlock); + dt->do_ops->do_write_unlock(env, dt); +} + +static inline int dt_write_locked(const struct lu_env *env, + struct dt_object *dt) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_write_locked); + return dt->do_ops->do_write_locked(env, dt); +} + +static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *la, void *arg) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_attr_get); + return dt->do_ops->do_attr_get(env, dt, la, arg); +} + +static inline int dt_declare_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *la, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_attr_set); + return dt->do_ops->do_declare_attr_set(env, dt, la, th); +} + +static inline int dt_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *la, struct thandle *th, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_attr_set); + return dt->do_ops->do_attr_set(env, dt, la, th, capa); +} + +static inline int dt_declare_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_ref_add); + return dt->do_ops->do_declare_ref_add(env, dt, th); +} + +static inline int dt_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_ref_add); + return dt->do_ops->do_ref_add(env, dt, th); +} + +static inline int dt_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_ref_del); + return dt->do_ops->do_declare_ref_del(env, dt, th); +} + +static inline int dt_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_ref_del); + return dt->do_ops->do_ref_del(env, dt, th); +} + +static inline struct obd_capa *dt_capa_get(const struct lu_env *env, + struct dt_object *dt, + struct lustre_capa *old, __u64 opc) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_ref_del); + return dt->do_ops->do_capa_get(env, dt, old, opc); +} + +static inline int dt_bufs_get(const struct lu_env *env, struct dt_object *d, + struct niobuf_remote *rnb, + struct niobuf_local *lnb, int rw, + struct lustre_capa *capa) +{ + LASSERT(d); + LASSERT(d->do_body_ops); + LASSERT(d->do_body_ops->dbo_bufs_get); + return d->do_body_ops->dbo_bufs_get(env, d, rnb->offset, + rnb->len, lnb, rw, capa); +} + +static inline int dt_bufs_put(const struct lu_env *env, struct dt_object *d, + struct niobuf_local *lnb, int n) +{ + LASSERT(d); + LASSERT(d->do_body_ops); + LASSERT(d->do_body_ops->dbo_bufs_put); + return d->do_body_ops->dbo_bufs_put(env, d, lnb, n); +} + +static inline int dt_write_prep(const struct lu_env *env, struct dt_object *d, + struct niobuf_local *lnb, int n) +{ + LASSERT(d); + LASSERT(d->do_body_ops); + LASSERT(d->do_body_ops->dbo_write_prep); + return d->do_body_ops->dbo_write_prep(env, d, lnb, n); +} + +static inline int dt_declare_write_commit(const struct lu_env *env, + struct dt_object *d, + struct niobuf_local *lnb, + int n, struct thandle *th) +{ + LASSERTF(d != NULL, "dt is NULL when we want to declare write\n"); + LASSERT(th != NULL); + return d->do_body_ops->dbo_declare_write_commit(env, d, lnb, n, th); +} + + +static inline int dt_write_commit(const struct lu_env *env, + struct dt_object *d, struct niobuf_local *lnb, + int n, struct thandle *th) +{ + LASSERT(d); + LASSERT(d->do_body_ops); + LASSERT(d->do_body_ops->dbo_write_commit); + return d->do_body_ops->dbo_write_commit(env, d, lnb, n, th); +} + +static inline int dt_read_prep(const struct lu_env *env, struct dt_object *d, + struct niobuf_local *lnb, int n) +{ + LASSERT(d); + LASSERT(d->do_body_ops); + LASSERT(d->do_body_ops->dbo_read_prep); + return d->do_body_ops->dbo_read_prep(env, d, lnb, n); +} + +static inline int dt_declare_punch(const struct lu_env *env, + struct dt_object *dt, __u64 start, + __u64 end, struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_body_ops); + LASSERT(dt->do_body_ops->do_declare_punch); + return dt->do_body_ops->do_declare_punch(env, dt, start, end, th); +} + +static inline int dt_punch(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_body_ops); + LASSERT(dt->do_body_ops->do_punch); + return dt->do_body_ops->do_punch(env, dt, start, end, th, capa); +} + +static inline int dt_fiemap_get(const struct lu_env *env, struct dt_object *d, + struct ll_user_fiemap *fm) +{ + LASSERT(d); + if (d->do_body_ops == NULL) + return -EPROTO; + LASSERT(d->do_body_ops->dbo_fiemap_get); + return d->do_body_ops->dbo_fiemap_get(env, d, fm); +} + +static inline int dt_statfs(const struct lu_env *env, struct dt_device *dev, + cfs_kstatfs_t *osfs) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_statfs); + return dev->dd_ops->dt_statfs(env, dev, osfs); +} + +static inline int dt_root_get(const struct lu_env *env, struct dt_device *dev, + struct lu_fid *f) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_root_get); + return dev->dd_ops->dt_root_get(env, dev, f); +} + +static inline void dt_conf_get(const struct lu_env *env, + const struct dt_device *dev, + struct dt_device_param *param) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_conf_get); + return dev->dd_ops->dt_conf_get(env, dev, param); +} + +static inline int dt_sync(const struct lu_env *env, struct dt_device *dev) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_sync); + return dev->dd_ops->dt_sync(env, dev); +} + +static inline void dt_ro(const struct lu_env *env, struct dt_device *dev) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_ro); + return dev->dd_ops->dt_ro(env, dev); +} + +static inline int dt_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_index_ops); + LASSERT(dt->do_index_ops->dio_declare_insert); + return dt->do_index_ops->dio_declare_insert(env, dt, rec, key, th); +} + +static inline int dt_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th, + struct lustre_capa *capa, + int noquota) +{ + LASSERT(dt); + LASSERT(dt->do_index_ops); + LASSERT(dt->do_index_ops->dio_insert); + return dt->do_index_ops->dio_insert(env, dt, rec, key, th, + capa, noquota); +} + +static inline int dt_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_xattr_del); + return dt->do_ops->do_declare_xattr_del(env, dt, name, th); +} + +static inline int dt_xattr_del(const struct lu_env *env, + struct dt_object *dt, const char *name, + struct thandle *th, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_xattr_del); + return dt->do_ops->do_xattr_del(env, dt, name, th, capa); +} + +static inline int dt_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_declare_xattr_set); + return dt->do_ops->do_declare_xattr_set(env, dt, buf, name, fl, th); +} + +static inline int dt_xattr_set(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + const char *name, int fl, struct thandle *th, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_xattr_set); + return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa); +} + +static inline int dt_xattr_get(const struct lu_env *env, + struct dt_object *dt, struct lu_buf *buf, + const char *name, struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_xattr_get); + return dt->do_ops->do_xattr_get(env, dt, buf, name, capa); +} + +static inline int dt_xattr_list(const struct lu_env *env, + struct dt_object *dt, struct lu_buf *buf, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_ops); + LASSERT(dt->do_ops->do_xattr_list); + return dt->do_ops->do_xattr_list(env, dt, buf, capa); +} + +static inline int dt_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_index_ops); + LASSERT(dt->do_index_ops->dio_declare_delete); + return dt->do_index_ops->dio_declare_delete(env, dt, key, th); +} + +static inline int dt_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th, + struct lustre_capa *capa) +{ + LASSERT(dt); + LASSERT(dt->do_index_ops); + LASSERT(dt->do_index_ops->dio_delete); + return dt->do_index_ops->dio_delete(env, dt, key, th, capa); +} + +static inline int dt_commit_async(const struct lu_env *env, + struct dt_device *dev) +{ + LASSERT(dev); + LASSERT(dev->dd_ops); + LASSERT(dev->dd_ops->dt_commit_async); + return dev->dd_ops->dt_commit_async(env, dev); +} + +static inline int dt_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + int ret; + + LASSERT(dt); + LASSERT(dt->do_index_ops); + LASSERT(dt->do_index_ops->dio_lookup); + + ret = dt->do_index_ops->dio_lookup(env, dt, rec, key, capa); + if (ret > 0) + ret = 0; + else if (ret == 0) + ret = -ENOENT; + return ret; +} #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index f030e2b..c7b341e 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1337,6 +1337,7 @@ struct lov_mds_md_v1 { /* LOV EA mds/wire data (little-endian) */ #define XATTR_NAME_LMA "trusted.lma" #define XATTR_NAME_LMV "trusted.lmv" #define XATTR_NAME_LINK "trusted.link" +#define XATTR_NAME_VERSION "trusted.version" struct lov_mds_md_v3 { /* LOV EA mds/wire data (little-endian) */ diff --git a/lustre/include/lustre_fld.h b/lustre/include/lustre_fld.h index a0328d1..256879c 100644 --- a/lustre/include/lustre_fld.h +++ b/lustre/include/lustre_fld.h @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -155,6 +156,10 @@ int fld_server_init(struct lu_server_fld *fld, void fld_server_fini(struct lu_server_fld *fld, const struct lu_env *env); +int fld_declare_server_create(struct lu_server_fld *fld, + const struct lu_env *env, + struct thandle *th); + int fld_server_create(struct lu_server_fld *fld, const struct lu_env *env, struct lu_seq_range *add_range, diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 0346116..f55856c 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -278,10 +278,6 @@ struct md_object_operations { struct lustre_capa *, int renewal); int (*moo_object_sync)(const struct lu_env *, struct md_object *); - dt_obj_version_t (*moo_version_get)(const struct lu_env *, - struct md_object *); - void (*moo_version_set)(const struct lu_env *, struct md_object *, - dt_obj_version_t); int (*moo_path)(const struct lu_env *env, struct md_object *obj, char *path, int pathlen, __u64 *recno, int *linkno); int (*moo_file_lock)(const struct lu_env *env, struct md_object *obj, @@ -756,20 +752,6 @@ static inline int mo_object_sync(const struct lu_env *env, struct md_object *m) return m->mo_ops->moo_object_sync(env, m); } -static inline dt_obj_version_t mo_version_get(const struct lu_env *env, - struct md_object *m) -{ - LASSERT(m->mo_ops->moo_version_get); - return m->mo_ops->moo_version_get(env, m); -} - -static inline void mo_version_set(const struct lu_env *env, - struct md_object *m, dt_obj_version_t ver) -{ - LASSERT(m->mo_ops->moo_version_set); - return m->mo_ops->moo_version_set(env, m, ver); -} - static inline int mo_file_lock(const struct lu_env *env, struct md_object *m, struct lov_mds_md *lmm, struct ldlm_extent *extent, diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 2747214..0c891fa 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -91,7 +91,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d, mdd->mdd_child = lu2dt_dev(next); /* Prepare transactions callbacks. */ - mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb; + mdd->mdd_txn_cb.dtc_txn_start = NULL; mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb; mdd->mdd_txn_cb.dtc_txn_commit = NULL; mdd->mdd_txn_cb.dtc_cookie = mdd; @@ -136,6 +136,10 @@ static void mdd_device_shutdown(const struct lu_env *env, if (m->mdd_obd_dev) mdd_fini_obd(env, m, cfg); orph_index_fini(env, m); + if (m->mdd_capa != NULL) { + lu_object_put(env, &m->mdd_capa->do_lu); + m->mdd_capa = NULL; + } /* remove upcall device*/ md_upcall_fini(&m->mdd_md_dev); EXIT; @@ -367,6 +371,7 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec) time. In case of crash, we just restart with old log so we're allright. */ if (endrec == cur) { + /* XXX: transaction is started by llog itself */ rc = mdd_changelog_write_header(mdd, CLM_PURGE); if (rc) goto out; @@ -377,6 +382,7 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec) changed since the last purge) */ mdd->mdd_cl.mc_starttime = cfs_time_current_64(); + /* XXX: transaction is started by llog itself */ rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0); out: llog_ctxt_put(ctxt); @@ -409,6 +415,7 @@ int mdd_changelog_write_header(struct mdd_device *mdd, int markerflags) /* Status and action flags */ rec->cr.cr_markerflags = mdd->mdd_cl.mc_flags | markerflags; + /* XXX: transaction is started by llog itself */ rc = (mdd->mdd_cl.mc_mask & (1 << CL_MARK)) ? mdd_changelog_llog_write(mdd, rec, NULL) : 0; @@ -559,19 +566,6 @@ static int dot_lustre_mdd_object_sync(const struct lu_env *env, return -ENOSYS; } -static dt_obj_version_t dot_lustre_mdd_version_get(const struct lu_env *env, - struct md_object *obj) -{ - return 0; -} - -static void dot_lustre_mdd_version_set(const struct lu_env *env, - struct md_object *obj, - dt_obj_version_t version) -{ - return; -} - static int dot_lustre_mdd_path(const struct lu_env *env, struct md_object *obj, char *path, int pathlen, __u64 *recno, int *linkno) { @@ -608,8 +602,6 @@ static struct md_object_operations mdd_dot_lustre_obj_ops = { .moo_close = dot_lustre_mdd_close, .moo_capa_get = mdd_capa_get, .moo_object_sync = dot_lustre_mdd_object_sync, - .moo_version_get = dot_lustre_mdd_version_get, - .moo_version_set = dot_lustre_mdd_version_set, .moo_path = dot_lustre_mdd_path, .moo_file_lock = dot_file_lock, .moo_file_unlock = dot_file_unlock, @@ -994,12 +986,9 @@ static int mdd_process_config(const struct lu_env *env, rc = mdd_init_obd(env, m, cfg); if (rc) { - CERROR("lov init error %d \n", rc); + CERROR("lov init error %d\n", rc); GOTO(out, rc); } - rc = mdd_txn_init_credits(env, m); - if (rc) - break; mdd_changelog_init(env, m); break; @@ -1089,6 +1078,7 @@ static int mdd_prepare(const struct lu_env *env, struct mdd_device *mdd = lu2mdd_dev(cdev); struct lu_device *next = &mdd->mdd_child->dd_lu_dev; struct dt_object *root; + struct lu_fid fid; int rc; ENTRY; @@ -1115,6 +1105,14 @@ static int mdd_prepare(const struct lu_env *env, GOTO(out, rc); } + /* we use capa file to declare llog changes, + * will be fixed with new llog in 2.3 */ + root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid); + if (!IS_ERR(root)) + mdd->mdd_capa = root; + else + rc = PTR_ERR(root); + out: RETURN(rc); } @@ -1350,6 +1348,35 @@ out: RETURN(rc); } +int mdd_declare_llog_cancel(const struct lu_env *env, struct mdd_device *mdd, + struct thandle *handle) +{ + int rc; + + + /* XXX: this is a temporary solution to declare llog changes + * will be fixed in 2.3 with new llog implementation */ + + LASSERT(mdd->mdd_capa); + + /* the llog record could be canceled either by modifying + * the plain llog's header or by destroying the llog itself + * when this record is the last one in it, it can't be known + * here, but the catlog's header will also be modified for + * the second case, then the first case can be covered and + * is no need to declare it */ + + /* destroy empty plain log */ + rc = dt_declare_destroy(env, mdd->mdd_capa, handle); + if (rc) + return rc; + + /* record the catlog's header if an empty plain log was destroyed */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + sizeof(struct llog_logid_rec), 0, handle); + return rc; +} + struct mdd_changelog_user_data { __u64 mcud_endrec; /**< purge record for this user */ __u64 mcud_minrec; /**< lowest changelog recno still referenced */ @@ -1401,7 +1428,7 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh, /* Special case: unregister this user */ if (mcud->mcud_endrec == MCUD_UNREGISTER) { struct llog_cookie cookie; - void *trans_h; + void *th; struct mdd_device *mdd = mcud->mcud_mdd; cookie.lgc_lgl = llh->lgh_id; @@ -1409,20 +1436,27 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh, /* XXX This is a workaround for the deadlock of changelog * adding vs. changelog cancelling. LU-81. */ - mdd_txn_param_build(mcud->mcud_env, mdd, MDD_TXN_UNLINK_OP, 0); - trans_h = mdd_trans_start(mcud->mcud_env, mdd); - if (IS_ERR(trans_h)) { - CERROR("fsfilt_start_log failed: %ld\n", - PTR_ERR(trans_h)); - RETURN(PTR_ERR(trans_h)); + th = mdd_trans_create(mcud->mcud_env, mdd); + if (IS_ERR(th)) { + CERROR("Cannot get thandle\n"); + RETURN(-ENOMEM); } + rc = mdd_declare_llog_cancel(mcud->mcud_env, mdd, th); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(mcud->mcud_env, mdd, th); + if (rc) + GOTO(stop, rc); + rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie); if (rc == 0) mcud->mcud_usercount--; - mdd_trans_stop(mcud->mcud_env, mdd, rc, trans_h); +stop: + mdd_trans_stop(mcud->mcud_env, mdd, 0, th); RETURN(rc); } diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 14c0ed8..38ec54a 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -81,6 +81,9 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle); static int mdd_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, const struct lu_fid *pfid, @@ -609,6 +612,63 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, RETURN(rc); } +int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd, + int reclen, struct thandle *handle) +{ + int rc; + + /* XXX: this is a temporary solution to declare llog changes + * will be fixed in 2.3 with new llog implementation */ + + LASSERT(mdd->mdd_capa); + + /* record itself */ + rc = dt_declare_record_write(env, mdd->mdd_capa, reclen, 0, handle); + if (rc) + return rc; + + /* header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE, + 0, handle); + if (rc) + return rc; + + /* also we should be able to create new plain log */ + rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle); + if (rc) + return rc; + + /* new record referencing new plain llog */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + sizeof(struct llog_logid_rec), 0, handle); + if (rc) + return rc; + + /* catalog's header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE, + 0, handle); + + return rc; +} + +int mdd_declare_changelog_store(const struct lu_env *env, + struct mdd_device *mdd, + const struct lu_name *fname, + struct thandle *handle) +{ + int reclen; + + /* Not recording */ + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + return 0; + + /* we'll be writing payload + llog header */ + reclen = sizeof(struct llog_changelog_rec); + if (fname) + reclen += llog_data_len(fname->ln_namelen); + + return mdd_declare_llog_record(env, mdd, reclen, handle); +} /** Store a namespace change changelog record * If this fails, we must fail the whole transaction; we don't @@ -674,6 +734,40 @@ static int mdd_changelog_ns_store(const struct lu_env *env, return 0; } +static int mdd_declare_link(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle); + if (rc) + return rc; + + rc = mdo_declare_ref_add(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; +} + static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const struct lu_name *lname, struct md_attr *ma) @@ -713,11 +807,18 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP, 1); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -758,6 +859,7 @@ out_trans: if (rc == 0) rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj, mdd_tobj, NULL, lname, handle); +stop: mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT @@ -773,6 +875,20 @@ out_pending: return rc; } +int mdd_declare_finish_unlink(const struct lu_env *env, + struct mdd_object *obj, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = orph_declare_index_insert(env, obj, handle); + if (rc) + return rc; + + return mdd_declare_object_kill(env, obj, ma, handle); +} + /* caller should take a lock before calling */ int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, @@ -805,7 +921,7 @@ int mdd_finish_unlink(const struct lu_env *env, PFID(mdd_object_fid(obj)), obj->mod_count); } else { - rc = mdd_object_kill(env, obj, ma); + rc = mdd_object_kill(env, obj, ma, th); if (rc == 0) reset = 0; } @@ -832,6 +948,50 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj, RETURN(rc); } +static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *p, struct mdd_object *c, + const struct lu_name *name, struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_delete(env, p, name->ln_name, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, p, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, c, ma, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; +} + static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) @@ -857,14 +1017,19 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", PFID(mdd_object_fid(mdd_cobj))); - rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP, 1); - if (rc) - RETURN(rc); - - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj, + lname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -942,6 +1107,7 @@ out_trans: mdd_cobj, mdd_pobj, NULL, lname, handle); } +stop: mdd_trans_stop(env, mdd, rc, handle); #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) @@ -998,6 +1164,10 @@ static int mdd_name_insert(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) { @@ -1019,11 +1189,12 @@ static int mdd_name_insert(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP, 0); - handle = mdd_trans_start(env, mdo2mdd(pobj)); + handle = mdd_trans_create(env, mdo2mdd(pobj)); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdo2mdd(pobj), handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1113,6 +1284,10 @@ static int mdd_name_remove(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; @@ -1124,11 +1299,12 @@ static int mdd_name_remove(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP, 0); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1223,6 +1399,10 @@ static int mdd_rename_tgt(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && !tobj) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; @@ -1240,14 +1420,12 @@ static int mdd_rename_tgt(const struct lu_env *env, } } #endif - if (tobj && mdd_object_exists(mdd_tobj)) - mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_TGT_OP,1); - else - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP, 1); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1355,6 +1533,27 @@ static int mdd_cd_sanity_check(const struct lu_env *env, } +static int mdd_declare_create_data(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + int lmm_size, + struct thandle *handle) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc; + + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + + return rc; +} + static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct md_op_spec *spec, struct md_attr *ma) @@ -1381,11 +1580,18 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (rc) RETURN(rc); - mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_CREATE_DATA_OP, 0); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* * XXX: Setting the lov ea is not locked but setting the attr is locked? * Should this be fixed? @@ -1411,6 +1617,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (rc == 0) mdd_lov_objid_update(mdd, lmm); +stop: mdd_trans_stop(env, mdd, rc, handle); out_free: /* Finish mdd_lov_create() stuff. */ @@ -1468,6 +1675,26 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, RETURN(rc); } +int mdd_declare_object_initialize(const struct lu_env *env, + struct mdd_object *child, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle); + if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_index_insert(env, child, mdo2fid(child), + dot, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, child, handle); + } + if (rc == 0) + mdd_declare_links_add(env, child, handle); + + return rc; +} + int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, const struct lu_name *lname, struct mdd_object *child, struct md_attr *ma, struct thandle *handle, @@ -1590,6 +1817,76 @@ static int mdd_create_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_create(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct md_attr *ma, + int lmm_size, + struct thandle *handle, + const struct md_op_spec *spec) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc = 0; + + rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec); + if (rc) + GOTO(out, rc); + + /* if dir, then can inherit default ACl */ + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + if (S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT, + 0, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, p, handle); + } + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS, + 0, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_object_initialize(env, c, ma, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), + name->ln_name, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + GOTO(out, rc); + + if (S_ISLNK(ma->ma_attr.la_mode)) { + rc = dt_declare_record_write(env, mdd_object_child(c), + strlen(spec->u.sp_symname), 0, + handle); + if (rc) + GOTO(out, rc); + } + rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + +out: + return rc; +} + + /* * Create object and insert it into namespace. */ @@ -1738,11 +2035,19 @@ static int mdd_create(const struct lu_env *env, got_def_acl = 1; } - mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_MKDIR_OP, 1); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma, + lmm_size, handle, spec); + if (rc) + GOTO(out_stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(out_stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1869,6 +2174,7 @@ out_trans: S_ISREG(attr->la_mode) ? CL_CREATE : S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, 0, son, mdd_pobj, NULL, lname, handle); +out_stop: mdd_trans_stop(env, mdd, rc, handle); out_free: /* finish lov_create stuff, free all temporary data */ @@ -1974,6 +2280,124 @@ static int mdd_rename_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_rename(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *mdd_spobj, + struct mdd_object *mdd_tpobj, + struct mdd_object *mdd_sobj, + struct mdd_object *mdd_tobj, + const struct lu_name *sname, + const struct lu_name *tname, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + LASSERT(mdd_spobj); + LASSERT(mdd_tpobj); + LASSERT(mdd_sobj); + + /* name from source dir */ + rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle); + if (rc) + return rc; + + /* .. from source child */ + if (S_ISDIR(mdd_object_type(mdd_sobj))) { + /* source child can be directory, + * counted by source dir's nlink */ + rc = mdo_declare_ref_del(env, mdd_spobj, handle); + if (rc) + return rc; + + rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle); + if (rc) + return rc; + + rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj), + dotdot, handle); + if (rc) + return rc; + + /* new target child can be directory, + * counted by target dir's nlink */ + rc = mdo_declare_ref_add(env, mdd_tpobj, handle); + if (rc) + return rc; + + } + + rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle); + if (rc) + return rc; + mdd_declare_links_add(env, mdd_sobj, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle); + if (rc) + return rc; + + /* new name */ + rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj), + tname->ln_name, handle); + if (rc) + return rc; + + /* name from target dir (old name), we declare it unconditionally + * as mdd_rename() calls delete unconditionally as well. so just + * to balance declarations vs calls to change ... */ + rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle); + if (rc) + return rc; + + if (mdd_tobj && mdd_object_exists(mdd_tobj)) { + /* delete target child in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + if (S_ISDIR(mdd_object_type(mdd_tobj))) { + /* target child can be directory, + * delete "." reference in target child directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + /* delete ".." reference in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tpobj, handle); + if (rc) + return rc; + } + + rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle); + if (rc) + return rc; + + mdd_declare_links_add(env, mdd_tobj, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle); + if (rc) + return rc; + } + + rc = mdd_declare_changelog_store(env, mdd, tname, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, sname, handle); + if (rc) + return rc; + + return rc; +} + /* src object can be remote that is why we use only fid and type of object */ static int mdd_rename(const struct lu_env *env, struct md_object *src_pobj, struct md_object *tgt_pobj, @@ -2040,14 +2464,21 @@ static int mdd_rename(const struct lu_env *env, } } #endif - if (tobj && mdd_object_exists(mdd_tobj)) - mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_OP, 2); - else - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP, 2); - handle = mdd_trans_start(env, mdd); + mdd_sobj = mdd_object_find(env, mdd, lf); + + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj, + mdd_tobj, lsname, ltname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* FIXME: Should consider tobj and sobj too in rename_lock. */ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj); if (rc < 0) @@ -2073,7 +2504,6 @@ static int mdd_rename(const struct lu_env *env, if (sdlh == NULL || tdlh == NULL) GOTO(cleanup, rc = -ENOMEM); - mdd_sobj = mdd_object_find(env, mdd, lf); rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj, ma); if (rc) @@ -2258,6 +2688,7 @@ cleanup_unlocked: ltname, handle); } +stop: mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); @@ -2399,6 +2830,20 @@ static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, return 0; } +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle) +{ + int rc; + + /* XXX: max size? */ + rc = mdo_declare_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, NULL, 4096), + XATTR_NAME_LINK, 0, handle); + + return rc; +} + /* For pathologic linkers, we don't want to spend lots of time scanning the * link ea. Limit ourseleves to something reasonable; links not in the EA * can be looked up via (slower) parent lookup. diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 1a525a3..4080998 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -71,28 +71,6 @@ static inline void mdd_quota_wrapper(struct lu_attr *la, unsigned int *qids) * are already protected by ldlm lock */ #define MDD_DISABLE_PDO_LOCK 1 -enum mdd_txn_op { - MDD_TXN_OBJECT_DESTROY_OP = 0, - MDD_TXN_OBJECT_CREATE_OP, - MDD_TXN_ATTR_SET_OP, - MDD_TXN_XATTR_SET_OP, - MDD_TXN_INDEX_INSERT_OP, - MDD_TXN_INDEX_DELETE_OP, - MDD_TXN_LINK_OP, - MDD_TXN_UNLINK_OP, - MDD_TXN_RENAME_OP, - MDD_TXN_RENAME_TGT_OP, - MDD_TXN_CREATE_DATA_OP, - MDD_TXN_MKDIR_OP, - MDD_TXN_CLOSE_OP, - MDD_TXN_LAST_OP -}; - -struct mdd_txn_op_descr { - enum mdd_txn_op mod_op; - unsigned int mod_credits; -}; - /* Changelog flags */ /** changelog is recording */ #define CLM_ON 0x00001 @@ -128,10 +106,10 @@ struct mdd_device { struct lu_fid mdd_root_fid; struct dt_device_param mdd_dt_conf; struct dt_object *mdd_orphans; + struct dt_object *mdd_capa; struct dt_txn_callback mdd_txn_cb; cfs_proc_dir_entry_t *mdd_proc_entry; struct lprocfs_stats *mdd_stats; - struct mdd_txn_op_descr mdd_tod[MDD_TXN_LAST_OP]; struct mdd_changelog mdd_cl; unsigned long mdd_atime_diff; struct mdd_object *mdd_dot_lustre; @@ -173,7 +151,6 @@ struct mdd_object { }; struct mdd_thread_info { - struct txn_param mti_param; struct lu_fid mti_fid; struct lu_fid mti_fid2; /* used for be & cpu converting */ struct lu_attr mti_la; @@ -225,6 +202,8 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, struct lov_mds_md **lmm, int *lmm_size, const struct md_op_spec *spec, struct lu_attr *la); int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm); +int mdd_declare_lov_objid_update(const struct lu_env *, struct mdd_device *, + struct thandle *); void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm); void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd, struct lov_mds_md *lmm, int lmm_size, @@ -251,8 +230,10 @@ int mdd_attr_check_set_internal(const struct lu_env *env, struct lu_attr *attr, struct thandle *handle, int needacl); +int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma, struct thandle *handle); int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, - struct md_attr *ma); + struct md_attr *ma, struct thandle *handle); int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma); int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj, @@ -325,6 +306,8 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, struct lu_name *lname, struct lu_fid *pfid); /* mdd_lov.c */ +int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma, struct thandle *handle); int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *mdd_cobj, struct md_attr *ma); @@ -353,7 +336,10 @@ int __mdd_orphan_del(const struct lu_env *, struct mdd_object *, struct thandle *); int orph_index_init(const struct lu_env *env, struct mdd_device *mdd); void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd); -int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd); +int orph_declare_index_insert(const struct lu_env *, struct mdd_object *, + struct thandle *); +int orph_declare_index_delete(const struct lu_env *, struct mdd_object *, + struct thandle *); /* mdd_lproc.c */ void lprocfs_mdd_init_vars(struct lprocfs_static_vars *lvars); @@ -382,8 +368,20 @@ struct mdd_object *mdd_object_find(const struct lu_env *env, int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm); int mdd_readpage(const struct lu_env *env, struct md_object *obj, const struct lu_rdpg *rdpg); +int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd, + int reclen, struct thandle *handle); +int mdd_declare_changelog_store(const struct lu_env *env, + struct mdd_device *mdd, + const struct lu_name *fname, + struct thandle *handle); int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type, int flags, struct md_object *obj); +int mdd_declare_object_create_internal(const struct lu_env *env, + struct mdd_object *p, + struct mdd_object *c, + struct md_attr *ma, + struct thandle *handle, + const struct md_op_spec *spec); /* mdd_quota.c*/ #ifdef HAVE_QUOTA_SUPPORT int mdd_quota_notify(const struct lu_env *env, struct md_device *m); @@ -416,19 +414,6 @@ int mdd_quota_finvalidate(const struct lu_env *env, struct md_device *m, #endif /* mdd_trans.c */ -void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, - enum mdd_txn_op, int changelog_cnt); -int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, - struct lov_mds_md *lmm, enum mdd_txn_op op, - int changelog_cnt); -int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op, - int changelog_cnt); -void mdd_setattr_txn_param_build(const struct lu_env *env, - struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op, - int changelog_cnt); - int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *obj, struct lu_attr *la); @@ -438,17 +423,16 @@ static inline void mdd_object_put(const struct lu_env *env, lu_object_put(env, &o->mod_obj.mo_lu); } -struct thandle* mdd_trans_start(const struct lu_env *env, - struct mdd_device *); - +struct thandle *mdd_trans_create(const struct lu_env *env, + struct mdd_device *mdd); +int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd, + struct thandle *th); void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd, int rc, struct thandle *handle); - -int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param, - void *cookie); - int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie); +int mdd_txn_start_cb(const struct lu_env *env, struct thandle *, + void *cookie); /* mdd_device.c */ struct lu_object *mdd_object_alloc(const struct lu_env *env, @@ -481,6 +465,8 @@ int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma); int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode, struct thandle *handle); +int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj, + int is_dir, struct thandle *handle); int __mdd_acl_init(const struct lu_env *env, struct mdd_object *obj, struct lu_buf *buf, __u32 *mode, struct thandle *handle); int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj, @@ -673,8 +659,19 @@ static inline int mdo_attr_get(const struct lu_env *env, struct mdd_object *obj, return next->do_ops->do_attr_get(env, next, la, capa); } -static inline int mdo_attr_set(const struct lu_env *env, struct mdd_object *obj, - const struct lu_attr *la, struct thandle *handle, +static inline int mdo_declare_attr_set(const struct lu_env *env, + struct mdd_object *obj, + const struct lu_attr *la, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + return dt_declare_attr_set(env, next, la, handle); +} + +static inline int mdo_attr_set(const struct lu_env *env, + struct mdd_object *obj, + const struct lu_attr *la, + struct thandle *handle, struct lustre_capa *capa) { struct dt_object *next = mdd_object_child(obj); @@ -690,6 +687,16 @@ static inline int mdo_xattr_get(const struct lu_env *env,struct mdd_object *obj, return next->do_ops->do_xattr_get(env, next, buf, name, capa); } +static inline int mdo_declare_xattr_set(const struct lu_env *env, + struct mdd_object *obj, + const struct lu_buf *buf, + const char *name, + int fl, struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + return dt_declare_xattr_set(env, next, buf, name, fl, handle); +} + static inline int mdo_xattr_set(const struct lu_env *env,struct mdd_object *obj, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle, @@ -701,6 +708,15 @@ static inline int mdo_xattr_set(const struct lu_env *env,struct mdd_object *obj, capa); } +static inline int mdo_declare_xattr_del(const struct lu_env *env, + struct mdd_object *obj, + const char *name, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + return dt_declare_xattr_del(env, next, name, handle); +} + static inline int mdo_xattr_del(const struct lu_env *env,struct mdd_object *obj, const char *name, struct thandle *handle, struct lustre_capa *capa) @@ -727,16 +743,69 @@ int mdo_index_try(const struct lu_env *env, struct mdd_object *obj, return next->do_ops->do_index_try(env, next, feat); } -static inline void mdo_ref_add(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) +static inline +int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj, + const struct lu_fid *fid, const char *name, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + int rc = 0; + + /* + * if the object doesn't exist yet, then it's supposed to be created + * and declaration of the creation should be enough to insert ./.. + */ + if (mdd_object_exists(obj)) { + rc = -ENOTDIR; + if (dt_try_as_dir(env, next)) + rc = dt_declare_insert(env, next, + (struct dt_rec *)fid, + (const struct dt_key *)name, + handle); + } + + return rc; +} + +static inline +int mdo_declare_index_delete(const struct lu_env *env, struct mdd_object *obj, + const char *name, struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + + if (!dt_try_as_dir(env, next)) + return -ENOTDIR; + + return dt_declare_delete(env, next, (const struct dt_key *)name, + handle); +} + +static inline int mdo_declare_ref_add(const struct lu_env *env, + struct mdd_object *obj, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + return dt_declare_ref_add(env, next, handle); +} + +static inline int mdo_ref_add(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle) { struct dt_object *next = mdd_object_child(obj); LASSERT(mdd_object_exists(obj)); return next->do_ops->do_ref_add(env, next, handle); } -static inline void mdo_ref_del(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) +static inline int mdo_declare_ref_del(const struct lu_env *env, + struct mdd_object *obj, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(obj); + return dt_declare_ref_del(env, next, handle); +} + +static inline int mdo_ref_del(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle) { struct dt_object *next = mdd_object_child(obj); LASSERT(mdd_object_exists(obj)); @@ -744,6 +813,18 @@ static inline void mdo_ref_del(const struct lu_env *env, struct mdd_object *obj, } static inline +int mdo_declare_create_obj(const struct lu_env *env, struct mdd_object *o, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(o); + return next->do_ops->do_declare_create(env, next, attr, hint, + dof, handle); +} + +static inline int mdo_create_obj(const struct lu_env *env, struct mdd_object *o, struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -754,6 +835,22 @@ int mdo_create_obj(const struct lu_env *env, struct mdd_object *o, return next->do_ops->do_create(env, next, attr, hint, dof, handle); } +static inline +int mdo_declare_destroy(const struct lu_env *env, struct mdd_object *o, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(o); + return dt_declare_destroy(env, next, handle); +} + +static inline +int mdo_destroy(const struct lu_env *env, struct mdd_object *o, + struct thandle *handle) +{ + struct dt_object *next = mdd_object_child(o); + return dt_destroy(env, next, handle); +} + static inline struct obd_capa *mdo_capa_get(const struct lu_env *env, struct mdd_object *obj, struct lustre_capa *old, diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 69df68e..e9a1417 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -369,6 +369,24 @@ int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm) return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm); } +int mdd_declare_lov_objid_update(const struct lu_env *env, + struct mdd_device *mdd, + struct thandle *handle) +{ + struct obd_device *obd = mdd2obd_dev(mdd); + int size; + + /* in prepare we create local files */ + if (unlikely(mdd->mdd_capa == NULL)) + return 0; + + /* XXX: this is a temporary solution to declare llog changes + * will be fixed in 2.3 with new llog implementation */ + + size = obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id); + return dt_declare_record_write(env, mdd->mdd_capa, size, 0, handle); +} + void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm) { /* copy mds_lov code is using wrong layer */ @@ -661,6 +679,48 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, RETURN(rc); } +int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma, struct thandle *handle) +{ + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + int rc, stripe, i; + + LASSERT(obj); + LASSERT(ma); + + if (!S_ISREG(lu_object_attr(&obj->mod_obj.mo_lu))) + return 0; + + rc = mdd_lmm_get_locked(env, obj, ma); + if (rc || !(ma->ma_valid & MA_LOV)) + return rc; + + LASSERT(ma->ma_lmm); + if (le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V1 && + le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V3) { + CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n", + mdd->mdd_obd_dev->obd_name, + le32_to_cpu(ma->ma_lmm->lmm_magic), + PFID(lu_object_fid(&obj->mod_obj.mo_lu))); + return -EINVAL; + } + + if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0) + stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count; + else + stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count); + + for (i = 0; i < stripe; i++) { + rc = mdd_declare_llog_record(env, mdd, + sizeof(struct llog_unlink_rec), + handle); + if (rc) + return rc; + } + + return rc; +} + int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *mdd_cobj, struct md_attr *ma) { diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index fa25ba2..9c85d46 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -55,6 +55,7 @@ #include /* fid_be_cpu(), fid_cpu_to_be(). */ #include +#include #include #include @@ -870,6 +871,30 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj, RETURN(rc); } +int mdd_declare_object_create_internal(const struct lu_env *env, + struct mdd_object *p, + struct mdd_object *c, + struct md_attr *ma, + struct thandle *handle, + const struct md_op_spec *spec) +{ + struct dt_object_format *dof = &mdd_env_info(env)->mti_dof; + const struct dt_index_features *feat = spec->sp_feat; + int rc; + ENTRY; + + if (feat != &dt_directory_features && feat != NULL) + dof->dof_type = DFT_INDEX; + else + dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode); + + dof->u.dof_idx.di_feat = feat; + + rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle); + + RETURN(rc); +} + int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, struct md_attr *ma, struct thandle *handle, @@ -1279,6 +1304,7 @@ static int mdd_changelog_data_store(const struct lu_env *env, RETURN(0); LASSERT(mdd_obj != NULL); + LASSERT(handle != NULL); if ((type >= CL_MTIME) && (type <= CL_ATIME) && cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) { @@ -1300,20 +1326,11 @@ static int mdd_changelog_data_store(const struct lu_env *env, rec->cr.cr_namelen = 0; mdd_obj->mod_cltime = cfs_time_current_64(); - if (handle == NULL) { - /* Used for the close event only for now. */ - LASSERT(type == CL_CLOSE); - LASSERT(mdd_env_info(env)->mti_param.tp_credits != 0); - th = mdd_trans_start(env, mdd); - if (IS_ERR(th)) - GOTO(err, rc = PTR_ERR(th)); - } - rc = mdd_changelog_llog_write(mdd, rec, handle ? : th); if (th) mdd_trans_stop(env, mdd, rc, th); -err: + if (rc < 0) { CERROR("changelog failed: rc=%d op%d t"DFID"\n", rc, type, PFID(tfid)); @@ -1332,14 +1349,22 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type, int rc; ENTRY; - handle = mdd_trans_start(env, mdd); - + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) return(PTR_ERR(handle)); + rc = mdd_declare_changelog_store(env, mdd, NULL, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj, handle); +stop: mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); @@ -1452,6 +1477,69 @@ static int mdd_attr_set_changelog(const struct lu_env *env, md2mdd_obj(obj), handle); } +static int mdd_declare_attr_set(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + const struct md_attr *ma, + struct lov_mds_md *lmm, + struct thandle *handle) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc, stripe, i; + + rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, NULL, handle); + if (rc) + return rc; + + if (ma->ma_valid & MA_LOV) { + buf->lb_buf = NULL; + buf->lb_len = ma->ma_lmm_size; + rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + return rc; + } + + if (ma->ma_valid & (MA_HSM | MA_SOM)) { + buf->lb_buf = NULL; + buf->lb_len = sizeof(struct lustre_mdt_attrs); + rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, + 0, handle); + if (rc) + return rc; + } + + /* basically the log is the same as in unlink case */ + if (lmm) { + if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 && + le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) { + CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n", + mdd->mdd_obd_dev->obd_name, + le32_to_cpu(lmm->lmm_magic), + PFID(lu_object_fid(&obj->mod_obj.mo_lu))); + return -EINVAL; + } + + stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count; + if ((int)le32_to_cpu(lmm->lmm_stripe_count) >= 0) + stripe = le32_to_cpu(lmm->lmm_stripe_count); + + for (i = 0; i < stripe; i++) { + rc = mdd_declare_llog_record(env, mdd, + sizeof(struct llog_unlink_rec), + handle); + if (rc) + return rc; + } + } + + return rc; +} + /* set attr and LOV EA at once, return updated attr */ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, const struct md_attr *ma) @@ -1461,7 +1549,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct thandle *handle; struct lov_mds_md *lmm = NULL; struct llog_cookie *logcookies = NULL; - int rc, lmm_size = 0, cookie_size = 0, chlog_cnt; + int rc, lmm_size = 0, cookie_size = 0; struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; struct obd_device *obd = mdd->mdd_obd_dev; struct mds_obd *mds = &obd->u.mds; @@ -1484,33 +1572,36 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0) RETURN(0); - /*TODO: add lock here*/ - /* start a log jounal handle if needed */ if (S_ISREG(mdd_object_type(mdd_obj)) && ma->ma_attr.la_valid & (LA_UID | LA_GID)) { lmm_size = mdd_lov_mdsize(env, mdd); lmm = mdd_max_lmm_get(env, mdd); if (lmm == NULL) - GOTO(no_trans, rc = -ENOMEM); + RETURN(-ENOMEM); rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size, XATTR_NAME_LOV); if (rc < 0) - GOTO(no_trans, rc); - } - - chlog_cnt = 1; - if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) { - chlog_cnt += (lmm->lmm_stripe_count >= 0) ? - lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count; + RETURN(rc); } - mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma, - MDD_TXN_ATTR_SET_OP, chlog_cnt); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) - GOTO(no_trans, rc = PTR_ERR(handle)); + RETURN(PTR_ERR(handle)); + + rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma, + lmm_size > 0 ? lmm : NULL, handle); + if (rc) + GOTO(stop, rc); + + /* permission changes may require sync operation */ + if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) + handle->th_sync = !!mdd->mdd_sync_permission; + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); /* permission changes may require sync operation */ if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) @@ -1595,8 +1686,8 @@ cleanup: if (rc == 0) rc = mdd_attr_set_changelog(env, obj, handle, ma->ma_attr.la_valid); +stop: mdd_trans_stop(env, mdd, rc, handle); -no_trans: if (rc == 0 && (lmm != NULL && lmm_size > 0 )) { /*set obd attr, if needed*/ rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size, @@ -1654,6 +1745,27 @@ static int mdd_xattr_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_xattr_set(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + const struct lu_buf *buf, + const char *name, + struct thandle *handle) + +{ + int rc; + + rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle); + if (rc) + return rc; + + /* Only record user xattr changes */ + if ((strncmp("user.", name, 5) == 0)) + rc = mdd_declare_changelog_store(env, mdd, NULL, handle); + + return rc; +} + /** * The caller should guarantee to update the object ctime * after xattr_set if needed. @@ -1672,12 +1784,24 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, if (rc) RETURN(rc); - mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); /* security-replated changes may require sync */ + if (!strcmp(name, XATTR_NAME_ACL_ACCESS) && + mdd->mdd_sync_permission == 1) + handle->th_sync = 1; + + rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + + /* security-replated changes may require sync */ if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) handle->th_sync |= mdd->mdd_sync_permission; @@ -1692,11 +1816,32 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)) rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj, handle); + +stop: mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); } +static int mdd_declare_xattr_del(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + const char *name, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_xattr_del(env, obj, name, handle); + if (rc) + return rc; + + /* Only record user xattr changes */ + if ((strncmp("user.", name, 5) == 0)) + rc = mdd_declare_changelog_store(env, mdd, NULL, handle); + + return rc; +} + /** * The caller should guarantee to update the object ctime * after xattr_set if needed. @@ -1714,11 +1859,18 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, if (rc) RETURN(rc); - mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); rc = mdo_xattr_del(env, mdd_obj, name, handle, mdd_object_capa(env, mdd_obj)); @@ -1734,6 +1886,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj, handle); +stop: mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); @@ -1756,6 +1909,10 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + /* * Check -ENOENT early here because we need to get object type * to calculate credits before transaction start @@ -1765,14 +1922,12 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, LASSERT(mdd_object_exists(mdd_obj) > 0); - rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0); - if (rc) - RETURN(rc); - - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(-ENOMEM); + rc = mdd_trans_start(env, mdd, handle); + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma); @@ -1865,6 +2020,10 @@ static int mdd_object_create(const struct lu_env *env, int rc = 0; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; @@ -1890,11 +2049,12 @@ static int mdd_object_create(const struct lu_env *env, } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); rc = mdd_oc_sanity_check(env, mdd_obj, ma); if (rc) @@ -1977,11 +2137,16 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj, int rc; ENTRY; - mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0); - handle = mdd_trans_start(env, mdd); + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(-ENOMEM); + rc = mdd_trans_start(env, mdd, handle); + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj); if (rc == 0) @@ -2104,10 +2269,22 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj, return rc; } +int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma, struct thandle *handle) +{ + int rc; + + rc = mdd_declare_unlink_log(env, obj, ma, handle); + if (rc) + return rc; + + return mdo_declare_destroy(env, obj, handle); +} + /* return md_attr back, * if it is last unlink then return lov ea + llog cookie*/ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, - struct md_attr *ma) + struct md_attr *ma, struct thandle *handle) { int rc = 0; ENTRY; @@ -2121,9 +2298,27 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), obj, ma); } + + if (rc == 0) + rc = mdo_destroy(env, obj, handle); + RETURN(rc); } +static int mdd_declare_close(const struct lu_env *env, + struct mdd_object *obj, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = orph_declare_index_delete(env, obj, handle); + if (rc) + return rc; + + return mdd_declare_object_kill(env, obj, ma, handle); +} + /* * No permission check is needed. */ @@ -2157,13 +2352,21 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, if (mdd_obj->mod_count == 1 && (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) { again: - rc = mdd_log_txn_param_build(env, obj, ma, - MDD_TXN_UNLINK_OP, 1); - if (rc) - RETURN(rc); - handle = mdd_trans_start(env, mdo2mdd(obj)); + handle = mdd_trans_create(env, mdo2mdd(obj)); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + + rc = mdd_declare_close(env, mdd_obj, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_declare_changelog_store(env, mdd, NULL, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdo2mdd(obj), handle); + if (rc) + GOTO(stop, rc); } mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); @@ -2178,6 +2381,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) { /* remove link to object from orphan index */ + LASSERT(handle != NULL); rc = __mdd_orphan_del(env, mdd_obj, handle); if (rc == 0) { CDEBUG(D_HA, "Object "DFID" is deleted from orphan " @@ -2210,7 +2414,27 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); } else { - rc = mdd_object_kill(env, mdd_obj, ma); + if (handle == NULL) { + handle = mdd_trans_create(env, mdo2mdd(obj)); + if (IS_ERR(handle)) + GOTO(out, rc = PTR_ERR(handle)); + + rc = mdd_declare_object_kill(env, mdd_obj, ma, + handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_changelog_store(env, mdd, + NULL, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdo2mdd(obj), handle); + if (rc) + GOTO(out, rc); + } + + rc = mdd_object_kill(env, mdd_obj, ma, handle); if (rc == 0) reset = 0; } @@ -2231,12 +2455,26 @@ out: if (rc == 0 && (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) && !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) { - if (handle == 0) - mdd_txn_param_build(env, mdd, MDD_TXN_CLOSE_OP, 1); + if (handle == NULL) { + handle = mdd_trans_create(env, mdo2mdd(obj)); + if (IS_ERR(handle)) + GOTO(stop, rc = IS_ERR(handle)); + + rc = mdd_declare_changelog_store(env, mdd, NULL, + handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdo2mdd(obj), handle); + if (rc) + GOTO(stop, rc); + } + mdd_changelog_data_store(env, mdd, CL_CLOSE, mode, mdd_obj, handle); } +stop: if (handle != NULL) mdd_trans_stop(env, mdd, rc, handle); #ifdef HAVE_QUOTA_SUPPORT @@ -2289,7 +2527,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, int len; int recsize; - len = iops->key_size(env, it); + len = iops->key_size(env, it); /* IAM iterator can return record with zero len. */ if (len == 0) @@ -2305,7 +2543,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, recsize = lu_dirent_calc_size(len, attr); if (nob >= recsize) { - result = iops->rec(env, it, ent, attr); + result = iops->rec(env, it, (struct dt_rec *)ent, attr); if (result == -ESTALE) goto next; if (result != 0) @@ -2501,24 +2739,6 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return next->do_ops->do_object_sync(env, next); } -static dt_obj_version_t mdd_version_get(const struct lu_env *env, - struct md_object *obj) -{ - struct mdd_object *mdd_obj = md2mdd_obj(obj); - - LASSERT(mdd_object_exists(mdd_obj)); - return do_version_get(env, mdd_object_child(mdd_obj)); -} - -static void mdd_version_set(const struct lu_env *env, struct md_object *obj, - dt_obj_version_t version) -{ - struct mdd_object *mdd_obj = md2mdd_obj(obj); - - LASSERT(mdd_object_exists(mdd_obj)); - do_version_set(env, mdd_object_child(mdd_obj), version); -} - const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2537,8 +2757,6 @@ const struct md_object_operations mdd_obj_ops = { .moo_changelog = mdd_changelog, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, - .moo_version_get = mdd_version_get, - .moo_version_set = mdd_version_set, .moo_path = mdd_path, .moo_file_lock = mdd_file_lock, .moo_file_unlock = mdd_file_unlock, diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 2902df5..108988a 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -57,6 +57,7 @@ #include "mdd_internal.h" const char orph_index_name[] = "PENDING"; +const char *dotdot = ".."; enum { ORPH_OP_UNLINK, @@ -182,6 +183,41 @@ static inline void mdd_orphan_ref_del(const struct lu_env *env, } +int orph_declare_index_insert(const struct lu_env *env, + struct mdd_object *obj, + struct thandle *th) +{ + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + int rc; + + rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th); + if (rc) + return rc; + + rc = mdo_declare_ref_add(env, obj, th); + if (rc) + return rc; + + if (!S_ISDIR(mdd_object_type(obj))) + return 0; + + rc = mdo_declare_ref_add(env, obj, th); + if (rc) + return rc; + + rc = dt_declare_ref_add(env, mdd->mdd_orphans, th); + if (rc) + return rc; + + rc = mdo_declare_index_delete(env, obj, dotdot, th); + if (rc) + return rc; + + rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th); + + return rc; +} + static int orph_index_insert(const struct lu_env *env, struct mdd_object *obj, __u32 op, @@ -191,7 +227,6 @@ static int orph_index_insert(const struct lu_env *env, struct dt_object *dor = mdd->mdd_orphans; const struct lu_fid *lf_dor = lu_object_fid(&dor->do_lu); struct dt_object *next = mdd_object_child(obj); - const struct dt_key *dotdot = (const struct dt_key *) ".."; int rc; ENTRY; @@ -217,11 +252,13 @@ static int orph_index_insert(const struct lu_env *env, if (!dt_try_as_dir(env, next)) goto out; next->do_index_ops->dio_delete(env, next, - dotdot, th, BYPASS_CAPA); + (const struct dt_key *)dotdot, + th, BYPASS_CAPA); next->do_index_ops->dio_insert(env, next, - (struct dt_rec *) lf_dor, - dotdot, th, BYPASS_CAPA, 1); + (struct dt_rec *)lf_dor, + (const struct dt_key *)dotdot, + th, BYPASS_CAPA, 1); out: if (rc == 0) @@ -264,9 +301,36 @@ static int orphan_object_kill(const struct lu_env *env, if (rc == 0) rc = mdd_lov_destroy(env, mdd, obj, la); } + mdo_destroy(env, obj, th); RETURN(rc); } +int orph_declare_index_delete(const struct lu_env *env, + struct mdd_object *obj, + struct thandle *th) +{ + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + int rc; + + rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, obj, th); + if (rc) + return rc; + + if (S_ISDIR(mdd_object_type(obj))) { + rc = mdo_declare_ref_del(env, obj, th); + if (rc) + return rc; + + rc = dt_declare_ref_del(env, mdd->mdd_orphans, th); + } + + return rc; +} + static int orph_index_delete(const struct lu_env *env, struct mdd_object *obj, __u32 op, @@ -330,12 +394,22 @@ static int orphan_object_destroy(const struct lu_env *env, ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE; ma->ma_valid = 0; - mdd_log_txn_param_build(env, &obj->mod_obj, ma, MDD_TXN_UNLINK_OP, 0); - th = mdd_trans_start(env, mdd); + th = mdd_trans_create(env, mdd); if (IS_ERR(th)) { CERROR("Cannot get thandle\n"); RETURN(-ENOMEM); } + rc = orph_declare_index_delete(env, obj, th); + if (rc) + GOTO(stop, rc); + + rc = mdd_declare_object_kill(env, obj, ma, th); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, th); + if (rc) + GOTO(stop, rc); mdd_write_lock(env, obj, MOR_TGT_CHILD); if (likely(obj->mod_count == 0)) { @@ -348,6 +422,8 @@ static int orphan_object_destroy(const struct lu_env *env, mdd_orphan_write_unlock(env, mdd); } mdd_write_unlock(env, obj); + +stop: mdd_trans_stop(env, mdd, 0, th); RETURN(rc); diff --git a/lustre/mdd/mdd_trans.c b/lustre/mdd/mdd_trans.c index c31ede1..393cd99 100644 --- a/lustre/mdd/mdd_trans.c +++ b/lustre/mdd/mdd_trans.c @@ -70,26 +70,6 @@ #include "mdd_internal.h" -static int dto_txn_credits[DTO_NR]; - -int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param, - void *cookie) -{ - struct mdd_device *mdd = cookie; - struct obd_device *obd = mdd2obd_dev(mdd); - /* Each transaction updates lov objids, the credits should be added for - * this */ - int blk, shift = mdd->mdd_dt_conf.ddp_block_shift; - blk = ((obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id) + - (1 << shift) - 1) >> shift) + 1; - - /* add lov objids credits */ - param->tp_credits += blk * dto_txn_credits[DTO_WRITE_BLOCK] + - dto_txn_credits[DTO_WRITE_BASE]; - - return 0; -} - int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie) { @@ -100,202 +80,16 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, return mds_lov_write_objids(obd); } -void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, - enum mdd_txn_op op, int changelog_cnt) +struct thandle *mdd_trans_create(const struct lu_env *env, + struct mdd_device *mdd) { - LASSERT(0 <= op && op < MDD_TXN_LAST_OP); - - txn_param_init(&mdd_env_info(env)->mti_param, - mdd->mdd_tod[op].mod_credits); - if (changelog_cnt > 0) { - txn_param_credit_add(&mdd_env_info(env)->mti_param, - changelog_cnt * dto_txn_credits[DTO_LOG_REC]); - } + return mdd_child_ops(mdd)->dt_trans_create(env, mdd->mdd_child); } -int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, - struct lov_mds_md *lmm, enum mdd_txn_op op, - int changelog_cnt) +int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd, + struct thandle *th) { - int stripes = 0; - ENTRY; - - LASSERT(op == MDD_TXN_CREATE_DATA_OP || op == MDD_TXN_MKDIR_OP); - - if (lmm == NULL) - GOTO(out, 0); - /* only replay create request will cause lov_objid update */ - if (!mdd->mdd_obd_dev->obd_recovering) - GOTO(out, 0); - - /* add possible orphan unlink rec credits used in lov_objid update */ - if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) { - stripes = le32_to_cpu(((struct lov_mds_md_v1*)lmm) - ->lmm_stripe_count); - } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3){ - stripes = le32_to_cpu(((struct lov_mds_md_v3*)lmm) - ->lmm_stripe_count); - } else { - CERROR("Unknown lmm type %X\n", le32_to_cpu(lmm->lmm_magic)); - LBUG(); - } -out: - mdd_txn_param_build(env, mdd, op, stripes + changelog_cnt); - RETURN(0); -} - -int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op op, - int changelog_cnt) -{ - struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj); - int rc, stripe = 0; - ENTRY; - - if (S_ISDIR(lu_object_attr(&obj->mo_lu))) - GOTO(out, rc = 0); - - LASSERT(op == MDD_TXN_UNLINK_OP || op == MDD_TXN_RENAME_OP || - op == MDD_TXN_RENAME_TGT_OP); - rc = mdd_lmm_get_locked(env, md2mdd_obj(obj), ma); - if (rc || !(ma->ma_valid & MA_LOV)) - GOTO(out, rc); - - LASSERTF(le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1 || - le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3, - "%08x", le32_to_cpu(ma->ma_lmm->lmm_magic)); - - if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0) - stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count; - else - stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count); - -out: - mdd_txn_param_build(env, mdd, op, stripe + changelog_cnt); - - RETURN(rc); -} - -void mdd_setattr_txn_param_build(const struct lu_env *env, - struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op op, - int changelog_cnt) -{ - struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj); - - mdd_txn_param_build(env, mdd, op, changelog_cnt); - if (ma->ma_attr.la_valid & (LA_UID | LA_GID)) - txn_param_credit_add(&mdd_env_info(env)->mti_param, - dto_txn_credits[DTO_ATTR_SET_CHOWN]); -} - -static void mdd_txn_init_dto_credits(const struct lu_env *env, - struct mdd_device *mdd, int *dto_credits) -{ - int op, credits; - for (op = 0; op < DTO_NR; op++) { - credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child, - op); - LASSERT(credits >= 0); - dto_txn_credits[op] = credits; - } -} - -int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd) -{ - int op; - - /* Init credits for each ops. */ - mdd_txn_init_dto_credits(env, mdd, dto_txn_credits); - - /* Calculate the mdd credits. */ - for (op = MDD_TXN_OBJECT_DESTROY_OP; op < MDD_TXN_LAST_OP; op++) { - int *c = &mdd->mdd_tod[op].mod_credits; - int *dt = dto_txn_credits; - mdd->mdd_tod[op].mod_op = op; - switch(op) { - case MDD_TXN_OBJECT_DESTROY_OP: - /* Unused now */ - *c = dt[DTO_OBJECT_DELETE]; - break; - case MDD_TXN_OBJECT_CREATE_OP: - /* OI INSERT + CREATE OBJECT */ - *c = dt[DTO_INDEX_INSERT] + - dt[DTO_OBJECT_CREATE]; - break; - case MDD_TXN_ATTR_SET_OP: - /* ATTR set + XATTR(lsm, lmv) set */ - *c = dt[DTO_ATTR_SET_BASE] + - dt[DTO_XATTR_SET]; - break; - case MDD_TXN_XATTR_SET_OP: - *c = dt[DTO_XATTR_SET]; - break; - case MDD_TXN_INDEX_INSERT_OP: - *c = dt[DTO_INDEX_INSERT]; - break; - case MDD_TXN_INDEX_DELETE_OP: - *c = dt[DTO_INDEX_DELETE]; - break; - case MDD_TXN_LINK_OP: - *c = dt[DTO_INDEX_INSERT]; - break; - case MDD_TXN_UNLINK_OP: - /* delete index + Unlink log + - * mdd orphan handling */ - *c = dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT] * 2 + - dt[DTO_XATTR_SET] * 3; - break; - case MDD_TXN_RENAME_OP: - /* 2 delete index + 1 insert + Unlink log */ - *c = 2 * dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT] + - dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT] * 2 + - dt[DTO_XATTR_SET] * 3; - break; - case MDD_TXN_RENAME_TGT_OP: - /* index insert + index delete */ - *c = dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT] + - dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT] * 2 + - dt[DTO_XATTR_SET] * 3; - break; - case MDD_TXN_CREATE_DATA_OP: - /* same as set xattr(lsm) */ - *c = dt[DTO_XATTR_SET]; - break; - case MDD_TXN_MKDIR_OP: - /* INDEX INSERT + OI INSERT + - * CREATE_OBJECT_CREDITS - * SET_MD CREDITS is already counted in - * CREATE_OBJECT CREDITS - */ - *c = 2 * dt[DTO_INDEX_INSERT] + - dt[DTO_OBJECT_CREATE]; - break; - case MDD_TXN_CLOSE_OP: - *c = 0; - break; - default: - CERROR("Invalid op %d init its credit\n", op); - LBUG(); - } - } - RETURN(0); -} - -struct thandle* mdd_trans_start(const struct lu_env *env, - struct mdd_device *mdd) -{ - struct txn_param *p = &mdd_env_info(env)->mti_param; - struct thandle *th; - - th = mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p); - return th; + return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, th); } void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd, diff --git a/lustre/mdt/mdt_capa.c b/lustre/mdt/mdt_capa.c index 578c3e8..968121d 100644 --- a/lustre/mdt/mdt_capa.c +++ b/lustre/mdt/mdt_capa.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * Copyright (c) 2011 Whamcloud, Inc. @@ -92,11 +93,19 @@ static int write_capa_keys(const struct lu_env *env, int i, rc; mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP); - th = mdt_trans_start(env, mdt); + th = mdt_trans_create(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = dt_declare_record_write(env, mdt->mdt_ck_obj, + sizeof(*tmp) * 3, 0, th); + if (rc) + goto stop; + + rc = mdt_trans_start(env, mdt, th); + if (rc) + goto stop; + tmp = &mti->mti_capa_key; for (i = 0; i < 2; i++) { @@ -109,6 +118,7 @@ static int write_capa_keys(const struct lu_env *env, break; } +stop: mdt_trans_stop(env, mdt, th); CDEBUG(D_INFO, "write capability keys rc = %d:\n", rc); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 0a98f73..f6444c3 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -5459,7 +5459,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION; rc = -ENOENT; } else { - version = mo_version_get(mti->mti_env, mdt_object_child(obj)); + version = dt_version_get(mti->mti_env, mdt_obj2dt(obj)); *(__u64 *)data->ioc_inlbuf2 = version; rc = 0; } diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index aee5a99..f4a1a31 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -391,7 +391,6 @@ struct mdt_thread_info { struct lr_server_data mti_lsd; struct lsd_client_data mti_lcd; loff_t mti_off; - struct txn_param mti_txn_param; struct lu_buf mti_buf; struct lustre_capa_key mti_capa_key; @@ -603,11 +602,10 @@ int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *, const struct md_attr *); void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *); -void mdt_trans_credit_init(const struct lu_env *env, - struct mdt_device *mdt, - enum mdt_txn_op op); -struct thandle* mdt_trans_start(const struct lu_env *env, - struct mdt_device *mdt); +struct thandle *mdt_trans_create(const struct lu_env *env, + struct mdt_device *mdt); +int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt, + struct thandle *th); void mdt_trans_stop(const struct lu_env *env, struct mdt_device *mdt, struct thandle *th); int mdt_record_write(const struct lu_env *env, @@ -650,6 +648,16 @@ static inline struct mdt_device *mdt_dev(struct lu_device *d) return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev); } +static inline struct dt_object *mdt_obj2dt(struct mdt_object *mo) +{ + struct lu_object *lo; + struct mdt_device *mdt = mdt_dev(mo->mot_obj.mo_lu.lo_dev); + + lo = lu_object_locate(mo->mot_obj.mo_lu.lo_header, + mdt->mdt_bottom->dd_lu_dev.ld_type); + return lu2dt(lo); +} + /* mdt/mdt_identity.c */ #define MDT_IDENTITY_UPCALL_PATH "/usr/sbin/l_getidentity" diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 8ae3631..abc604c 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -80,50 +80,16 @@ const struct lu_buf *mdt_buf_const(const struct lu_env *env, return buf; } -static inline int mdt_trans_credit_get(const struct lu_env *env, - struct mdt_device *mdt, - enum mdt_txn_op op) -{ - struct dt_device *dev = mdt->mdt_bottom; - int cr; - switch (op) { - case MDT_TXN_CAPA_KEYS_WRITE_OP: - case MDT_TXN_LAST_RCVD_WRITE_OP: - cr = dev->dd_ops->dt_credit_get(env, - dev, - DTO_WRITE_BLOCK); - break; - default: - LBUG(); - } - return cr; -} - -void mdt_trans_credit_init(const struct lu_env *env, - struct mdt_device *mdt, - enum mdt_txn_op op) +struct thandle *mdt_trans_create(const struct lu_env *env, + struct mdt_device *mdt) { - struct mdt_thread_info *mti; - struct txn_param *p; - int cr; - - mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - p = &mti->mti_txn_param; - - cr = mdt_trans_credit_get(env, mdt, op); - txn_param_init(p, cr); + return mdt->mdt_bottom->dd_ops->dt_trans_create(env, mdt->mdt_bottom); } -struct thandle* mdt_trans_start(const struct lu_env *env, - struct mdt_device *mdt) +int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt, + struct thandle *th) { - struct mdt_thread_info *mti; - struct txn_param *p; - - mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - p = &mti->mti_txn_param; - - return dt_trans_start(env, mdt->mdt_bottom, p); + return dt_trans_start(env, mdt->mdt_bottom, th); } void mdt_trans_stop(const struct lu_env *env, @@ -159,6 +125,18 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env, return rc; } +static int mdt_declare_last_rcvd_header_write(const struct lu_env *env, + struct mdt_device *mdt, + struct thandle *th) +{ + struct mdt_thread_info *mti; + + mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + + return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd, + sizeof(mti->mti_lsd), 0, th); +} + static int mdt_last_rcvd_header_write(const struct lu_env *env, struct mdt_device *mdt, struct thandle *th) @@ -212,6 +190,14 @@ static int mdt_last_rcvd_read(const struct lu_env *env, struct mdt_device *mdt, return rc; } +static int mdt_declare_last_rcvd_write(const struct lu_env *env, + struct mdt_device *mdt, + loff_t off, struct thandle *th) +{ + return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd, + sizeof(struct lsd_client_data), off, th); +} + static int mdt_last_rcvd_write(const struct lu_env *env, struct mdt_device *mdt, struct lsd_client_data *lcd, @@ -503,11 +489,18 @@ static int mdt_server_data_update(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); - th = mdt_trans_start(env, mdt); + th = mdt_trans_create(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = mdt_declare_last_rcvd_header_write(env, mdt, th); + if (rc) + goto out; + + rc = mdt_trans_start(env, mdt, th); + if (rc) + goto out; + CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", mdt->mdt_lut.lut_obd->u.obt.obt_mount_count, mdt->mdt_lut.lut_last_transno); @@ -517,6 +510,8 @@ static int mdt_server_data_update(const struct lu_env *env, cfs_spin_unlock(&mdt->mdt_lut.lut_translock); rc = mdt_last_rcvd_header_write(env, mdt, th); + +out: mdt_trans_stop(env, mdt, th); return rc; } @@ -575,12 +570,18 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD)) RETURN(-ENOSPC); - mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); - - th = mdt_trans_start(env, mdt); + th = mdt_trans_create(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = mdt_declare_last_rcvd_write(env, mdt, off, th); + if (rc) + GOTO(stop, rc); + + rc = mdt_trans_start(env, mdt, th); + if (rc) + GOTO(stop, rc); + /* * Until this operations will be committed the sync is needed * for this export. This should be done _after_ starting the @@ -600,6 +601,8 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th); CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n", cl_idx, ted->ted_lr_off, (int)sizeof(*(ted->ted_lcd))); + +stop: mdt_trans_stop(env, mdt, th); RETURN(rc); @@ -709,15 +712,24 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) * be in server data or in client data in case of failure */ mdt_server_data_update(env, mdt); - mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); - th = mdt_trans_start(env, mdt); + th = mdt_trans_create(env, mdt); if (IS_ERR(th)) GOTO(free, rc = PTR_ERR(th)); + rc = mdt_declare_last_rcvd_write(env, mdt, off, th); + if (rc) + GOTO(stop, rc); + + rc = mdt_trans_start(env, mdt, th); + if (rc) + GOTO(stop, rc); + cfs_mutex_down(&ted->ted_lcd_lock); memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid); rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th); cfs_mutex_up(&ted->ted_lcd_lock); + +stop: mdt_trans_stop(env, mdt, th); CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in " @@ -831,23 +843,13 @@ extern struct lu_context_key mdt_thread_key; /* add credits for last_rcvd update */ static int mdt_txn_start_cb(const struct lu_env *env, - struct txn_param *param, void *cookie) + struct thandle *th, void *cookie) { struct mdt_device *mdt = cookie; - param->tp_credits += mdt_trans_credit_get(env, mdt, - MDT_TXN_LAST_RCVD_WRITE_OP); - return 0; -} - -/* Set new object versions */ -static void mdt_version_set(struct mdt_thread_info *info) -{ - if (info->mti_mos != NULL) { - mo_version_set(info->mti_env, mdt_object_child(info->mti_mos), - info->mti_transno); - info->mti_mos = NULL; - } + /* XXX: later we'll be declaring this at specific offset */ + return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd, + sizeof(struct lsd_client_data), 0, th); } /* Update last_rcvd records with latests transaction data */ @@ -893,8 +895,11 @@ static int mdt_txn_stop_cb(const struct lu_env *env, LASSERT(req != NULL && req->rq_repmsg != NULL); /** VBR: set new versions */ - if (txn->th_result == 0) - mdt_version_set(mti); + if (txn->th_result == 0 && mti->mti_mos != NULL) { + dt_version_set(env, mdt_obj2dt(mti->mti_mos), + mti->mti_transno, txn); + mti->mti_mos = NULL; + } /* filling reply data */ CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n", diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 3c4c011..ec2e5d8 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -110,7 +110,7 @@ static void mdt_obj_version_get(struct mdt_thread_info *info, LASSERT(o); LASSERT(mdt_object_exists(o) >= 0); if (mdt_object_exists(o) > 0) - *version = mo_version_get(info->mti_env, mdt_object_child(o)); + *version = dt_version_get(info->mti_env, mdt_obj2dt(o)); else *version = ENOENT_VERSION; CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n", diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 3d08d2f..4ecf61e 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -89,21 +90,23 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb) EXPORT_SYMBOL(dt_txn_callback_del); int dt_txn_hook_start(const struct lu_env *env, - struct dt_device *dev, struct txn_param *param) + struct dt_device *dev, struct thandle *th) { - int result; + int rc = 0; struct dt_txn_callback *cb; - result = 0; + if (th->th_local) + return 0; + cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { if (cb->dtc_txn_start == NULL || !(cb->dtc_tag & env->le_ctx.lc_tags)) continue; - result = cb->dtc_txn_start(env, param, cb->dtc_cookie); - if (result < 0) + rc = cb->dtc_txn_start(env, th, cb->dtc_cookie); + if (rc < 0) break; } - return result; + return rc; } EXPORT_SYMBOL(dt_txn_hook_start); @@ -111,18 +114,20 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn) { struct dt_device *dev = txn->th_dev; struct dt_txn_callback *cb; - int result; + int rc = 0; + + if (txn->th_local) + return 0; - result = 0; cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { if (cb->dtc_txn_stop == NULL || !(cb->dtc_tag & env->le_ctx.lc_tags)) continue; - result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie); - if (result < 0) + rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie); + if (rc < 0) break; } - return result; + return rc; } EXPORT_SYMBOL(dt_txn_hook_stop); @@ -130,6 +135,9 @@ void dt_txn_hook_commit(struct thandle *txn) { struct dt_txn_callback *cb; + if (txn->th_local) + return; + cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks, dtc_linkage) { if (cb->dtc_txn_commit) @@ -200,31 +208,21 @@ enum dt_format_type dt_mode_to_dft(__u32 mode) } return result; } - EXPORT_SYMBOL(dt_mode_to_dft); + /** * lookup fid for object named \a name in directory \a dir. */ -static int dt_lookup(const struct lu_env *env, struct dt_object *dir, - const char *name, struct lu_fid *fid) +int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir, + const char *name, struct lu_fid *fid) { - struct dt_rec *rec = (struct dt_rec *)fid; - const struct dt_key *key = (const struct dt_key *)name; - int result; - - if (dt_try_as_dir(env, dir)) { - result = dir->do_index_ops->dio_lookup(env, dir, rec, key, - BYPASS_CAPA); - if (result > 0) - result = 0; - else if (result == 0) - result = -ENOENT; - } else - result = -ENOTDIR; - return result; + if (dt_try_as_dir(env, dir)) + return dt_lookup(env, dir, (struct dt_rec *)fid, + (const struct dt_key *)name, BYPASS_CAPA); + return -ENOTDIR; } - +EXPORT_SYMBOL(dt_lookup_dir); /** * get object for given \a fid. */ @@ -257,7 +255,7 @@ static int dt_find_entry(const struct lu_env *env, const char *entry, void *data struct dt_object *obj = dfh->dfh_o; int result; - result = dt_lookup(env, obj, entry, fid); + result = dt_lookup_dir(env, obj, entry, fid); lu_object_put(env, &obj->do_lu); if (result == 0) { obj = dt_locate(env, dt, fid); @@ -341,7 +339,7 @@ static struct dt_object *dt_reg_open(const struct lu_env *env, struct dt_object *o; int result; - result = dt_lookup(env, p, name, fid); + result = dt_lookup_dir(env, p, name, fid); if (result == 0){ o = dt_locate(env, dt, fid); } @@ -416,6 +414,8 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt, LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); LASSERT(th != NULL); + LASSERT(dt->do_body_ops); + LASSERT(dt->do_body_ops->dbo_write); rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1); if (rc == buf->lb_len) rc = 0; @@ -425,5 +425,56 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt, } EXPORT_SYMBOL(dt_record_write); +int dt_declare_version_set(const struct lu_env *env, struct dt_object *o, + struct thandle *th) +{ + struct lu_buf vbuf; + char *xname = XATTR_NAME_VERSION; + + LASSERT(o); + vbuf.lb_buf = NULL; + vbuf.lb_len = sizeof(dt_obj_version_t); + return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th); + +} +EXPORT_SYMBOL(dt_declare_version_set); + +void dt_version_set(const struct lu_env *env, struct dt_object *o, + dt_obj_version_t version, struct thandle *th) +{ + struct lu_buf vbuf; + char *xname = XATTR_NAME_VERSION; + int rc; + + LASSERT(o); + vbuf.lb_buf = &version; + vbuf.lb_len = sizeof(version); + + rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA); + if (rc != 0) + CDEBUG(D_INODE, "Can't set version, rc %d\n", rc); + return; +} +EXPORT_SYMBOL(dt_version_set); + +dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o) +{ + struct lu_buf vbuf; + char *xname = XATTR_NAME_VERSION; + dt_obj_version_t version; + int rc; + + LASSERT(o); + vbuf.lb_buf = &version; + vbuf.lb_len = sizeof(version); + rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA); + if (rc != sizeof(version)) { + CDEBUG(D_INODE, "Can't get version, rc %d\n", rc); + version = 0; + } + return version; +} +EXPORT_SYMBOL(dt_version_get); + const struct dt_index_features dt_directory_features; EXPORT_SYMBOL(dt_directory_features); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index bffa028..45f92aa 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -138,9 +138,25 @@ static struct lu_context_key osd_key; static const struct dt_object_operations osd_obj_ops; static const struct dt_object_operations osd_obj_ea_ops; static const struct dt_body_operations osd_body_ops; +static const struct dt_body_operations osd_body_ops_new; static const struct dt_index_operations osd_index_iam_ops; static const struct dt_index_operations osd_index_ea_ops; +#define OSD_TRACK_DECLARES +#ifdef OSD_TRACK_DECLARES +#define OSD_DECLARE_OP(oh, op) { \ + LASSERT(oh->ot_handle == NULL); \ + ((oh)->ot_declare_ ##op)++; } +#define OSD_EXEC_OP(handle, op) { \ + struct osd_thandle *oh; \ + oh = container_of0(handle, struct osd_thandle, ot_super);\ + LASSERT((oh)->ot_declare_ ##op > 0); \ + ((oh)->ot_declare_ ##op)--; } +#else +#define OSD_DECLARE_OP(oh, op) +#define OSD_EXEC_OP(oh, op) +#endif + struct osd_thandle { struct thandle ot_super; handle_t *ot_handle; @@ -148,6 +164,20 @@ struct osd_thandle { cfs_list_t ot_dcb_list; /* Link to the device, for debugging. */ struct lu_ref_link *ot_dev_link; + int ot_credits; + +#ifdef OSD_TRACK_DECLARES + unsigned char ot_declare_attr_set; + unsigned char ot_declare_punch; + unsigned char ot_declare_xattr_set; + unsigned char ot_declare_create; + unsigned char ot_declare_destroy; + unsigned char ot_declare_ref_add; + unsigned char ot_declare_ref_del; + unsigned char ot_declare_write; + unsigned char ot_declare_insert; + unsigned char ot_declare_delete; +#endif #if OSD_THANDLE_STATS /** time when this handle was allocated */ @@ -158,6 +188,25 @@ struct osd_thandle { #endif }; +/** + * Basic transaction credit op + */ +enum dt_txn_op { + DTO_INDEX_INSERT, + DTO_INDEX_DELETE, + DTO_INDEX_UPDATE, + DTO_OBJECT_CREATE, + DTO_OBJECT_DELETE, + DTO_ATTR_SET_BASE, + DTO_XATTR_SET, + DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */ + DTO_WRITE_BASE, + DTO_WRITE_BLOCK, + DTO_ATTR_SET_CHOWN, + + DTO_NR +}; + /* * Helpers. */ @@ -474,6 +523,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l, LINVRNT(osd_invariant(obj)); result = osd_fid_lookup(env, obj, lu_object_fid(l)); + obj->oo_dt.do_body_ops = &osd_body_ops_new; if (result == 0) { if (obj->oo_inode != NULL) osd_object_init0(obj); @@ -641,9 +691,11 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev, * Concurrency: doesn't access mutable data. */ static int osd_param_is_sane(const struct osd_device *dev, - const struct txn_param *param) + const struct thandle *th) { - return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers; + struct osd_thandle *oh; + oh = container_of0(th, struct osd_thandle, ot_super); + return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers; } /* @@ -681,86 +733,129 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error) OBD_FREE_PTR(oh); } +static struct thandle *osd_trans_create(const struct lu_env *env, + struct dt_device *d) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_thandle *oh; + struct thandle *th; + ENTRY; + + th = ERR_PTR(-ENOMEM); + OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); + if (oh != NULL) { + th = &oh->ot_super; + th->th_dev = d; + th->th_result = 0; + th->th_tags = LCT_TX_HANDLE; + oh->ot_credits = 0; + oti->oti_dev = osd_dt_dev(d); + CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); + osd_th_alloced(oh); + } + RETURN(th); +} + /* * Concurrency: shouldn't matter. */ -static struct thandle *osd_trans_start(const struct lu_env *env, - struct dt_device *d, - struct txn_param *p) +int osd_trans_start(const struct lu_env *env, struct dt_device *d, + struct thandle *th) { + struct osd_thread_info *oti = osd_oti_get(env); struct osd_device *dev = osd_dt_dev(d); handle_t *jh; struct osd_thandle *oh; - struct thandle *th; - int hook_res; + int rc; ENTRY; - hook_res = dt_txn_hook_start(env, d, p); - if (hook_res != 0) - RETURN(ERR_PTR(hook_res)); + LASSERT(current->journal_info == NULL); - if (osd_param_is_sane(dev, p)) { - OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); - if (oh != NULL) { - struct osd_thread_info *oti = osd_oti_get(env); + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh != NULL); + LASSERT(oh->ot_handle == NULL); - /* - * XXX temporary stuff. Some abstraction layer should - * be used. - */ - oti->oti_dev = dev; - CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); - osd_th_alloced(oh); - jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits); - osd_th_started(oh); - if (!IS_ERR(jh)) { - oh->ot_handle = jh; - th = &oh->ot_super; - th->th_dev = d; - th->th_result = 0; - th->th_sync = 0; - lu_device_get(&d->dd_lu_dev); - oh->ot_dev_link = lu_ref_add - (&d->dd_lu_dev.ld_reference, - "osd-tx", th); - /* add commit callback */ - lu_context_init(&th->th_ctx, LCT_TX_HANDLE); - lu_context_enter(&th->th_ctx); - LASSERT(oti->oti_txns == 0); - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - oti->oti_txns++; - } else { - OBD_FREE_PTR(oh); - th = (void *)jh; - } - } else - th = ERR_PTR(-ENOMEM); - } else { - CERROR("Invalid transaction parameters\n"); - th = ERR_PTR(-EINVAL); + rc = dt_txn_hook_start(env, d, th); + if (rc != 0) + GOTO(out, rc); + + oh->ot_credits += LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev)); + + if (!osd_param_is_sane(dev, th)) { + CWARN("%s: too many transaction credits (%d > %d)\n", + d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits, + osd_journal(dev)->j_max_transaction_buffers); +#ifdef OSD_TRACK_DECLARES + CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n", + oh->ot_declare_attr_set, oh->ot_declare_punch, + oh->ot_declare_xattr_set); + CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n", + oh->ot_declare_create, oh->ot_declare_ref_add, + oh->ot_declare_ref_del, oh->ot_declare_write); + CERROR(" insert: %d, delete: %d\n", + oh->ot_declare_insert, oh->ot_declare_delete); +#endif } - RETURN(th); + /* + * XXX temporary stuff. Some abstraction layer should + * be used. + */ + jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits); + osd_th_started(oh); + if (!IS_ERR(jh)) { + oh->ot_handle = jh; + LASSERT(oti->oti_txns == 0); + lu_context_init(&th->th_ctx, th->th_tags); + lu_context_enter(&th->th_ctx); + + lu_device_get(&d->dd_lu_dev); + oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference, + "osd-tx", th); + + /* + * XXX: current rule is that we first start tx, + * then lock object(s), but we can't use + * this rule for data (due to locking specifics + * in ldiskfs). also in long-term we'd like to + * use usually-used (locks;tx) ordering. so, + * UGLY thing is that we'll use one ordering for + * data (ofd) and reverse ordering for metadata + * (mdd). then at some point we'll fix the latter + */ + if (lu_device_is_md(&d->dd_lu_dev)) { + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + } + + oti->oti_txns++; + rc = 0; + } else { + rc = PTR_ERR(jh); + } +out: + RETURN(rc); } /* * Concurrency: shouldn't matter. */ -static void osd_trans_stop(const struct lu_env *env, struct thandle *th) +static int osd_trans_stop(const struct lu_env *env, struct thandle *th) { - int result; - struct osd_thandle *oh; + int rc = 0; + struct osd_thandle *oh; struct osd_thread_info *oti = osd_oti_get(env); ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); + if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; hdl->h_sync = th->th_sync; + /* * add commit callback * notice we don't do this in osd_trans_start() @@ -771,20 +866,33 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th) LASSERT(oti->oti_txns == 1); oti->oti_txns--; - LASSERT(oti->oti_r_locks == 0); - LASSERT(oti->oti_w_locks == 0); - result = dt_txn_hook_stop(env, th); - if (result != 0) - CERROR("Failure in transaction hook: %d\n", result); + /* + * XXX: current rule is that we first start tx, + * then lock object(s), but we can't use + * this rule for data (due to locking specifics + * in ldiskfs). also in long-term we'd like to + * use usually-used (locks;tx) ordering. so, + * UGLY thing is that we'll use one ordering for + * data (ofd) and reverse ordering for metadata + * (mdd). then at some point we'll fix the latter + */ + if (lu_device_is_md(&th->th_dev->dd_lu_dev)) { + LASSERT(oti->oti_r_locks == 0); + LASSERT(oti->oti_w_locks == 0); + } + rc = dt_txn_hook_stop(env, th); + if (rc != 0) + CERROR("Failure in transaction hook: %d\n", rc); oh->ot_handle = NULL; OSD_CHECK_SLOW_TH(oh, oti->oti_dev, - result = ldiskfs_journal_stop(hdl)); - if (result != 0) - CERROR("Failure to stop transaction: %d\n", result); + rc = ldiskfs_journal_stop(hdl)); + if (rc != 0) + CERROR("Failure to stop transaction: %d\n", rc); } else { OBD_FREE_PTR(oh); } - EXIT; + + RETURN(rc); } static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) @@ -798,35 +906,6 @@ static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) } /* - * Concurrency: no concurrent access is possible that late in object - * life-cycle. - */ -static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj) -{ - const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *oti = osd_oti_get(env); - struct txn_param *prm = &oti->oti_txn; - struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env; - struct thandle *th; - int result; - - lu_env_init(env_del_obj, LCT_DT_THREAD); - txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + - OSD_TXN_INODE_DELETE_CREDITS); - th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm); - if (!IS_ERR(th)) { - result = osd_oi_delete(osd_oti_get(env_del_obj), - &osd->od_oi, fid, th); - osd_trans_stop(env_del_obj, th); - } else - result = PTR_ERR(th); - - lu_env_fini(env_del_obj); - return result; -} - -/* * Called just before object is freed. Releases all resources except for * object itself (that is released by osd_object_free()). * @@ -846,16 +925,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) osd_index_fini(obj); if (inode != NULL) { - int result; - - if (osd_inode_unlinked(inode)) { - result = osd_inode_remove(env, obj); - if (result != 0) - LU_OBJECT_DEBUG(D_ERROR, env, l, - "Failed to cleanup: %d\n", - result); - } - iput(inode); obj->oo_inode = NULL; } @@ -867,11 +936,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) static void osd_object_release(const struct lu_env *env, struct lu_object *l) { - struct osd_object *o = osd_obj(l); - - LASSERT(!lu_object_is_dying(l->lo_header)); - if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode)) - cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags); } /* @@ -996,20 +1060,18 @@ static int osd_commit_async(const struct lu_env *env, /* * Concurrency: shouldn't matter. */ -lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *); static void osd_ro(const struct lu_env *env, struct dt_device *d) { + struct super_block *sb = osd_sb(osd_dt_dev(d)); ENTRY; CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME); - __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))), - fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d)))); + __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev); EXIT; } - /* * Concurrency: serialization provided by callers. */ @@ -1063,7 +1125,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = { /** * Unused now */ - [DTO_IDNEX_UPDATE] = 16, + [DTO_INDEX_UPDATE] = 16, /** * Create a object. The same as create object in EXT3. * DATA_TRANS_BLOCKS(14) + @@ -1072,14 +1134,13 @@ static const int osd_dto_credits_noquota[DTO_NR] = { */ [DTO_OBJECT_CREATE] = 25, /** - * Unused now + * XXX: real credits to be fixed */ [DTO_OBJECT_DELETE] = 25, /** - * Attr set credits. - * 3(inode bits, group, GDT) + * Attr set credits (inode) */ - [DTO_ATTR_SET_BASE] = 3, + [DTO_ATTR_SET_BASE] = 1, /** * Xattr set. The same as xattr of EXT3. * DATA_TRANS_BLOCKS(14) @@ -1089,7 +1150,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = { [DTO_XATTR_SET] = 14, [DTO_LOG_REC] = 14, /** - * creadits for inode change during write. + * credits for inode change during write. */ [DTO_WRITE_BASE] = 3, /** @@ -1123,7 +1184,7 @@ static const int osd_dto_credits_quota[DTO_NR] = { /** * Unused now. */ - [DTO_IDNEX_UPDATE] = 16, + [DTO_INDEX_UPDATE] = 16, /* * Create a object. Same as create object in EXT3 filesystem. * DATA_TRANS_BLOCKS(16) + @@ -1170,23 +1231,10 @@ static const int osd_dto_credits_quota[DTO_NR] = { [DTO_ATTR_SET_CHOWN]= 68, }; -static int osd_credit_get(const struct lu_env *env, struct dt_device *d, - enum dt_txn_op op) -{ - LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) == - ARRAY_SIZE(osd_dto_credits_quota)); - LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota)); -#ifdef HAVE_QUOTA_SUPPORT - if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA)) - return osd_dto_credits_quota[op]; - else -#endif - return osd_dto_credits_noquota[op]; -} - static const struct dt_device_operations osd_dt_ops = { .dt_root_get = osd_root_get, .dt_statfs = osd_statfs, + .dt_trans_create = osd_trans_create, .dt_trans_start = osd_trans_start, .dt_trans_stop = osd_trans_stop, .dt_trans_cb_add = osd_trans_cb_add, @@ -1194,7 +1242,6 @@ static const struct dt_device_operations osd_dt_ops = { .dt_sync = osd_sync, .dt_ro = osd_ro, .dt_commit_async = osd_commit_async, - .dt_credit_get = osd_credit_get, .dt_init_capa_ctxt = osd_init_capa_ctxt, .dt_init_quota_ctxt= osd_init_quota_ctxt, }; @@ -1422,6 +1469,25 @@ static int osd_attr_get(const struct lu_env *env, return 0; } +static int osd_declare_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + LASSERT(osd_invariant(obj)); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, attr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + return 0; +} + static int osd_inode_setattr(const struct lu_env *env, struct inode *inode, const struct lu_attr *attr) { @@ -1506,6 +1572,8 @@ static int osd_attr_set(const struct lu_env *env, if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, attr_set); + cfs_spin_lock(&obj->oo_guard); rc = osd_inode_setattr(env, obj->oo_inode, attr); cfs_spin_unlock(&obj->oo_guard); @@ -1824,6 +1892,33 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); } +static int osd_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, insert); + OSD_DECLARE_OP(oh, create); + oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE]; + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + + /* if this is directory, then we expect . and .. + * to be inserted as well */ + OSD_DECLARE_OP(oh, insert); + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + return 0; +} + static int osd_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -1842,6 +1937,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, create); + result = __osd_object_create(info, obj, attr, hint, dof, th); if (result == 0) result = __osd_oi_insert(env, obj, fid, th); @@ -1852,6 +1949,63 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, } /** + * Called to destroy on-disk representation of the object + * + * Concurrency: must be locked + */ +static int osd_declare_object_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thandle *oh; + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + LASSERT(inode); + LASSERT(!lu_object_is_dying(dt->do_lu.lo_header)); + + OSD_DECLARE_OP(oh, destroy); + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE]; + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + + RETURN(0); +} + +static int osd_object_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_device *osd = osd_obj2dev(obj); + struct osd_thandle *oh; + int result; + ENTRY; + + oh = container_of0(th, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle); + LASSERT(inode); + LASSERT(osd_inode_unlinked(inode)); + + OSD_EXEC_OP(th, destroy); + + result = osd_oi_delete(osd_oti_get(env), &osd->od_oi, fid, th); + + /* XXX: add to ext3 orphan list */ + /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */ + + /* not needed in the cache anymore */ + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + + RETURN(0); +} + +/** * Helper function for osd_xattr_set() */ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, @@ -2044,6 +2198,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, create); + result = __osd_object_create(info, obj, attr, hint, dof, th); /* objects under osd root shld have igif fid, so dont add fid EA */ @@ -2058,12 +2214,30 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, RETURN(result); } +static int osd_declare_object_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle) +{ + struct osd_thandle *oh; + + /* it's possible that object doesn't exist yet */ + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, ref_add); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + return 0; +} + /* * Concurrency: @dt is write locked. */ -static void osd_object_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osd_object_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -2073,20 +2247,42 @@ static void osd_object_ref_add(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, ref_add); + cfs_spin_lock(&obj->oo_guard); LASSERT(inode->i_nlink < LDISKFS_LINK_MAX); inode->i_nlink++; cfs_spin_unlock(&obj->oo_guard); inode->i_sb->s_op->dirty_inode(inode); LINVRNT(osd_invariant(obj)); + + return 0; +} + +static int osd_declare_object_ref_del(const struct lu_env *env, + struct dt_object *dt, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, ref_del); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + + return 0; } /* * Concurrency: @dt is write locked. */ -static void osd_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osd_object_ref_del(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -2096,12 +2292,30 @@ static void osd_object_ref_del(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); + OSD_EXEC_OP(th, ref_del); + cfs_spin_lock(&obj->oo_guard); LASSERT(inode->i_nlink > 0); inode->i_nlink--; cfs_spin_unlock(&obj->oo_guard); inode->i_sb->s_op->dirty_inode(inode); LINVRNT(osd_invariant(obj)); + + return 0; +} + +/* + * Get the 64-bit version for an inode. + */ +static int osd_object_version_get(const struct lu_env *env, + struct dt_object *dt, dt_obj_version_t *ver) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n", + LDISKFS_I(inode)->i_fs_version, inode->i_ino); + *ver = LDISKFS_I(inode)->i_fs_version; + return 0; } /* @@ -2118,6 +2332,15 @@ static int osd_xattr_get(const struct lu_env *env, struct osd_thread_info *info = osd_oti_get(env); struct dentry *dentry = &info->oti_obj_dentry; + /* version get is not real XATTR but uses xattr API */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ + LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); + osd_object_version_get(env, dt, buf->lb_buf); + return sizeof(dt_obj_version_t); + } + LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); @@ -2129,6 +2352,47 @@ static int osd_xattr_get(const struct lu_env *env, return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); } + +static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int fl, struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* no credits for version */ + return 0; + } + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, xattr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + + return 0; +} + +/* + * Set the 64-bit version for object + */ +static void osd_object_version_set(const struct lu_env *env, + struct dt_object *dt, + dt_obj_version_t *new_version) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n", + *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); + + LDISKFS_I(inode)->i_fs_version = *new_version; + /** Version is set after all inode operations are finished, + * so we should mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); +} + /* * Concurrency: @dt is write locked. */ @@ -2138,9 +2402,19 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, { LASSERT(handle != NULL); + /* version set is not real XATTR */ + if (strcmp(name, XATTR_NAME_VERSION) == 0) { + /* for version we are just using xattr API but change inode + * field instead */ + LASSERT(buf->lb_len == sizeof(dt_obj_version_t)); + osd_object_version_set(env, dt, buf->lb_buf); + return sizeof(dt_obj_version_t); + } + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, xattr_set); return __osd_xattr_set(env, dt, buf, name, fl); } @@ -2168,6 +2442,25 @@ static int osd_xattr_list(const struct lu_env *env, return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len); } +static int osd_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, xattr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET]; + + return 0; +} + /* * Concurrency: @dt is write locked. */ @@ -2191,6 +2484,8 @@ static int osd_xattr_del(const struct lu_env *env, if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; + OSD_EXEC_OP(handle, xattr_set); + dentry->d_inode = inode; rc = inode->i_op->removexattr(dentry, name); return rc; @@ -2303,35 +2598,6 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) RETURN(rc); } -/* - * Get the 64-bit version for an inode. - */ -static dt_obj_version_t osd_object_version_get(const struct lu_env *env, - struct dt_object *dt) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; - - CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", - LDISKFS_I(inode)->i_fs_version, inode->i_ino); - return LDISKFS_I(inode)->i_fs_version; -} - -/* - * Set the 64-bit version and return the old version. - */ -static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, - dt_obj_version_t new_version) -{ - struct inode *inode = osd_dt_obj(dt)->oo_inode; - - CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", - new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); - LDISKFS_I(inode)->i_fs_version = new_version; - /** Version is set after all inode operations are finished, - * so we should mark it dirty here */ - inode->i_sb->s_op->dirty_inode(inode); -} - static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data) { @@ -2477,27 +2743,33 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, } static const struct dt_object_operations osd_obj_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_create = osd_object_create, - .do_index_try = osd_index_try, - .do_ref_add = osd_object_ref_add, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_xattr_set = osd_xattr_set, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_version_get = osd_object_version_get, - .do_version_set = osd_object_version_set, - .do_data_get = osd_data_get, + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; /** @@ -2505,27 +2777,33 @@ static const struct dt_object_operations osd_obj_ops = { * (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ static const struct dt_object_operations osd_obj_ea_ops = { - .do_read_lock = osd_object_read_lock, - .do_write_lock = osd_object_write_lock, - .do_read_unlock = osd_object_read_unlock, - .do_write_unlock = osd_object_write_unlock, - .do_write_locked = osd_object_write_locked, - .do_attr_get = osd_attr_get, - .do_attr_set = osd_attr_set, - .do_ah_init = osd_ah_init, - .do_create = osd_object_ea_create, - .do_index_try = osd_index_try, - .do_ref_add = osd_object_ref_add, - .do_ref_del = osd_object_ref_del, - .do_xattr_get = osd_xattr_get, - .do_xattr_set = osd_xattr_set, - .do_xattr_del = osd_xattr_del, - .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, - .do_object_sync = osd_object_sync, - .do_version_get = osd_object_version_get, - .do_version_set = osd_object_version_set, - .do_data_get = osd_data_get, + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_write_locked = osd_object_write_locked, + .do_attr_get = osd_attr_get, + .do_declare_attr_set = osd_declare_attr_set, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_declare_create = osd_declare_object_create, + .do_create = osd_object_ea_create, + .do_declare_destroy = osd_declare_object_destroy, + .do_destroy = osd_object_destroy, + .do_index_try = osd_index_try, + .do_declare_ref_add = osd_declare_object_ref_add, + .do_ref_add = osd_object_ref_add, + .do_declare_ref_del = osd_declare_object_ref_del, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_declare_xattr_set = osd_declare_xattr_set, + .do_xattr_set = osd_xattr_set, + .do_declare_xattr_del = osd_declare_xattr_del, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_data_get = osd_data_get, }; /* @@ -2704,6 +2982,23 @@ static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize, return err; } +static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, + const loff_t size, loff_t pos, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, write); + oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK]; + + return 0; +} + static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *handle, struct lustre_capa *capa, @@ -2748,11 +3043,35 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, return result; } +/* + * in some cases we may need declare methods for objects being created + * e.g., when we create symlink + */ +static const struct dt_body_operations osd_body_ops_new = { + .dbo_declare_write = osd_declare_write, +}; + static const struct dt_body_operations osd_body_ops = { - .dbo_read = osd_read, - .dbo_write = osd_write + .dbo_read = osd_read, + .dbo_declare_write = osd_declare_write, + .dbo_write = osd_write }; +static int osd_index_declare_iam_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + + return 0; +} /** * delete a (key, value) pair from index \a dt specified by \a key @@ -2786,6 +3105,8 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); + OSD_EXEC_OP(handle, delete); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -2800,6 +3121,25 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osd_index_declare_ea_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, delete); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; + + return 0; +} + static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, struct dt_rec *fid) { @@ -2843,6 +3183,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(dt_object_exists(dt)); LASSERT(handle != NULL); + OSD_EXEC_OP(handle, delete); + oh = container_of(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle != NULL); LASSERT(oh->ot_handle->h_transaction != NULL); @@ -2937,6 +3279,26 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osd_index_declare_iam_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + + return 0; +} + /** * Inserts (key, value) pair in \a dt index object. * @@ -2974,6 +3336,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) return -EACCES; + OSD_EXEC_OP(th, insert); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -3262,6 +3626,26 @@ static inline void osd_object_put(const struct lu_env *env, lu_object_put(env, &obj->oo_dt.do_lu); } +static int osd_index_declare_ea_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + struct osd_thandle *oh; + + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); + + OSD_DECLARE_OP(oh, insert); + oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; + + return 0; +} + /** * Index add function for interoperability mode (b11826). * It will add the directory entry.This entry is needed to @@ -3488,13 +3872,14 @@ static inline void osd_it_pack_dirent(struct lu_dirent *ent, */ static int osd_it_iam_rec(const struct lu_env *env, const struct dt_it *di, - struct lu_dirent *lde, + struct dt_rec *dtrec, __u32 attr) { struct osd_it_iam *it = (struct osd_it_iam *)di; struct osd_thread_info *info = osd_oti_get(env); struct lu_fid *fid = &info->oti_fid; const struct osd_fid_pack *rec; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; char *name; int namelen; __u64 hash; @@ -3553,9 +3938,11 @@ static int osd_it_iam_load(const struct lu_env *env, } static const struct dt_index_operations osd_index_iam_ops = { - .dio_lookup = osd_index_iam_lookup, - .dio_insert = osd_index_iam_insert, - .dio_delete = osd_index_iam_delete, + .dio_lookup = osd_index_iam_lookup, + .dio_declare_insert = osd_index_declare_iam_insert, + .dio_insert = osd_index_iam_insert, + .dio_declare_delete = osd_index_declare_iam_delete, + .dio_delete = osd_index_iam_delete, .dio_it = { .init = osd_it_iam_init, .fini = osd_it_iam_fini, @@ -3840,12 +4227,13 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) */ static inline int osd_it_ea_rec(const struct lu_env *env, const struct dt_it *di, - struct lu_dirent *lde, + struct dt_rec *dtrec, __u32 attr) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; struct lu_fid *fid = &it->oie_dirent->oied_fid; + struct lu_dirent *lde = (struct lu_dirent *)dtrec; int rc = 0; ENTRY; @@ -3938,9 +4326,11 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) */ static const struct dt_index_operations osd_index_ea_ops = { - .dio_lookup = osd_index_ea_lookup, - .dio_insert = osd_index_ea_insert, - .dio_delete = osd_index_ea_delete, + .dio_lookup = osd_index_ea_lookup, + .dio_declare_insert = osd_index_declare_ea_insert, + .dio_insert = osd_index_ea_insert, + .dio_declare_delete = osd_index_declare_ea_delete, + .dio_delete = osd_index_ea_delete, .dio_it = { .init = osd_it_ea_init, .fini = osd_it_ea_fini, diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 662f7a1..37fae5a 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -275,7 +275,6 @@ struct osd_thread_info { /* * XXX temporary: for ->i_op calls. */ - struct txn_param oti_txn; struct timespec oti_time; /* * XXX temporary: fake struct file for osd_object_sync diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 0de17bf..0b4f439 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -28,6 +28,7 @@ /* * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -149,19 +150,25 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, const struct lu_buf *buf, loff_t *off, int sync) { struct thandle *th; - struct txn_param p; - int rc, credits; + int rc; ENTRY; - credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom, - DTO_WRITE_BLOCK); - txn_param_init(&p, credits); - - th = dt_trans_start(env, lut->lut_bottom, &p); + th = dt_trans_create(env, lut->lut_bottom); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = dt_declare_record_write(env, lut->lut_last_rcvd, + buf->lb_len, *off, th); + if (rc) + goto stop; + + rc = dt_trans_start(env, lut->lut_bottom, th); + if (rc) + goto stop; + rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th); + +stop: dt_trans_stop(env, lut->lut_bottom, th); CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" @@ -239,8 +246,8 @@ int lut_client_data_update(const struct lu_env *env, struct obd_export *exp) /** * Update server data in last_rcvd */ -static int lut_server_data_update(const struct lu_env *env, - struct lu_target *lut, int sync) +int lut_server_data_update(const struct lu_env *env, + struct lu_target *lut, int sync) { struct lr_server_data tmp_lsd; loff_t tmp_off = 0; diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 596e7a4..872d1d0 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -2077,7 +2077,10 @@ run_test 85a "check the cancellation of unused locks during recovery(IBITS)" test_85b() { #bug 16774 lctl set_param -n ldlm.cancel_unused_locks_before_replay "1" - lfs setstripe -o 0 -c 1 $DIR + do_facet mgs $LCTL pool_new $FSNAME.$TESTNAME || return 1 + do_facet mgs $LCTL pool_add $FSNAME.$TESTNAME $FSNAME-OST0000 || return 2 + + lfs setstripe -c 1 -p $FSNAME.$TESTNAME $DIR for i in `seq 100`; do dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1 @@ -2093,12 +2096,16 @@ test_85b() { #bug 16774 addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'` count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count` echo "before recovery: unused locks count = $count" + [ $count != 0 ] || return 3 fail ost1 count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count` echo "after recovery: unused locks count = $count2" + do_facet mgs $LCTL pool_remove $FSNAME.$TESTNAME $FSNAME-OST0000 || return 4 + do_facet mgs $LCTL pool_destroy $FSNAME.$TESTNAME || return 5 + if [ $count2 -ge $count ]; then error "unused locks are not canceled" fi