From b50bb830f92e87da9bfdc84d14e4f3f78c80c9ac Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Sun, 2 Nov 2014 23:30:23 +0800 Subject: [PATCH] LU-3538 dne: Commit-on-Sharing for DNE This patch contains three parts: 1. Sync-on-Cancel for cross-MDT lock, which eleminates dependency between transactions and distributed transaction which modified remote object, this can guarantee the change of the distributed transaction will not be lost. 2. enable Commit-on-Sharing for DNE, PW/EX locks will be converted to COS locks, but by default they are ignored, when operation finds itself a distributed transaction, it will lock with LDLM_FL_COS_INCOMPAT flag to check against existed COS locks. This will eliminate dependency between distributed transaction and transactions which modify the same local object, and it guarantees distributed transaction can always be recovered. 3. striped directory creation needs to ensure its parent permanent on disk, to ensure this, cache child locks in mkdir. Sync-on-Cancel for cross-MDT lock When two operations have dependency on an object, and the first operation has a PW/EX cross-MDT lock on this object, trigger transaction commit on the MDT where the object resides to eliminate dependency, in short, this patch eliminates dependency between locks and existed PW/EX cross-MDT lock. This patch contains following changes: * enable Sync on Cancel for DNE by default. * save cross-MDT lock into tgt_uncommitted_soc_locks after use, and it will be released upon transaction commit, note, just a lock refcount is taken when lock is saved, the read/write count is released in mdt_object_unlock(). * the saved cross-MDT lock will be discarded upon BAST, because the MDT where the object resides will do sync on lock cancel. * use existed BLOCKING_SYNC_ON_CANCEL mechanism to commit transaction upon cross-MDT lock cancel. Commit-on-Sharing for DNE On DNE, Commit-on-Sharing is disabled by default, but MDT local PW/EX lock will be saved as COS lock, and such lock will be ignored in compatilibity check by default, unless it's required, there are two situations: 1. when distributed transaction locks local object, it will conflict with COS locks. 2. when distributed transaction enqueues cross-MDT lock, it will conflict with COS locks. This patch contains following changes: * on DNE, local PW/EX lock is converted to COS and saved like before even when COS is not enabled. * above COS locks will be ignored in lock compatibility check by default, so for local operations COS won't take effect. But if operation finds itself may modify remote MDT object, it will lock all local locks with COS checked. * cross-MDT lock will always conflict with COS locks. * if operation is reint, it will check whether it's a distributed operation (involved objects are remote or striped) if so, check against COS locks when enqueing locks. Eliminate dependency in dir creation Mkdir needs to take a lock on child, so that any subsequent distributed operation using that directory would observe a conflict and ensure that the original mkdir is committed. Benchmark result with createmany/unlinkmany is as follows: mkdir rmdir open unlink mknod unlink (ops/sec) 2.6 1194 1310 1314 1185 2242 1396 master 978 1166 937 1028 1681 1202 current 930 1161 918 1018 1691 1202 * 10 createmany/unlinkmany processes running on local client (on MDS), 4M dirs/files created/unlinked, and the numbers are average of 10 processes. * for 2.6, each process is running on a separate mountpoint. Signed-off-by: Lai Siyao Change-Id: I91928d097cbb26bd1e1089c3f8851ac6a6440a69 Reviewed-on: http://review.whamcloud.com/12530 Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin Tested-by: Oleg Drokin --- lustre/contrib/wireshark/lustre_dlm_flags_wshark.c | 19 + lustre/include/lu_target.h | 4 + lustre/include/lustre/lustre_idl.h | 5 + lustre/include/lustre_dlm.h | 7 + lustre/include/lustre_dlm_flags.h | 17 + lustre/ldlm/ldlm_inodebits.c | 44 +- lustre/ldlm/ldlm_lock.c | 4 + lustre/ldlm/ldlm_lockd.c | 1 + lustre/lod/lod_object.c | 52 +- lustre/mdt/mdt_handler.c | 188 ++++- lustre/mdt/mdt_internal.h | 19 + lustre/mdt/mdt_lproc.c | 100 +++ lustre/mdt/mdt_reint.c | 871 ++++++++++++--------- lustre/osp/osp_md_object.c | 9 +- lustre/target/tgt_handler.c | 42 +- lustre/target/tgt_internal.h | 2 + lustre/target/tgt_lastrcvd.c | 2 + lustre/target/tgt_main.c | 112 +++ lustre/tests/conf-sanity.sh | 53 ++ lustre/tests/sanityn.sh | 119 ++- lustre/utils/liblustreapi.c | 12 + 21 files changed, 1203 insertions(+), 479 deletions(-) diff --git a/lustre/contrib/wireshark/lustre_dlm_flags_wshark.c b/lustre/contrib/wireshark/lustre_dlm_flags_wshark.c index 0428d8b..eb091fb 100644 --- a/lustre/contrib/wireshark/lustre_dlm_flags_wshark.c +++ b/lustre/contrib/wireshark/lustre_dlm_flags_wshark.c @@ -21,6 +21,7 @@ static int hf_lustre_ldlm_fl_no_timeout = -1; static int hf_lustre_ldlm_fl_block_nowait = -1; static int hf_lustre_ldlm_fl_test_lock = -1; static int hf_lustre_ldlm_fl_cancel_on_block = -1; +static int hf_lustre_ldlm_fl_cos_incompat = -1; static int hf_lustre_ldlm_fl_deny_on_contention = -1; static int hf_lustre_ldlm_fl_ast_discard_data = -1; @@ -39,6 +40,7 @@ const value_string lustre_ldlm_flags_vals[] = { {LDLM_FL_BLOCK_NOWAIT, "LDLM_FL_BLOCK_NOWAIT"}, {LDLM_FL_TEST_LOCK, "LDLM_FL_TEST_LOCK"}, {LDLM_FL_CANCEL_ON_BLOCK, "LDLM_FL_CANCEL_ON_BLOCK"}, + {LDLM_FL_COS_INCOMPAT, "LDLM_FL_COS_INCOMPAT"}, {LDLM_FL_DENY_ON_CONTENTION, "LDLM_FL_DENY_ON_CONTENTION"}, {LDLM_FL_AST_DISCARD_DATA, "LDLM_FL_AST_DISCARD_DATA"}, { 0, NULL } @@ -81,6 +83,7 @@ lustre_dissect_element_ldlm_lock_flags( dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_block_nowait); dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_test_lock); dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_cancel_on_block); + dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_cos_incompat); dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_deny_on_contention); return dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_ast_discard_data); @@ -279,6 +282,22 @@ lustre_dissect_element_ldlm_lock_flags( } }, { + /* p_id */ &hf_lustre_ldlm_fl_cos_incompat, + /* hfinfo */ { + /* name */ "LDLM_FL_COS_INCOMPAT", + /* abbrev */ "lustre.ldlm_fl_cos_incompat", + /* type */ FT_BOOLEAN, + /* display */ 32, + /* strings */ TFS(&lnet_flags_set_truth), + /* bitmask */ LDLM_FL_COS_INCOMPAT, + /* blurb */ "Flag whether a lock is enqueued from a distributed transaction, and the\n" + "requesting lock mode is PW/EX, if so, it will check compatibility with COS\n" + "locks, and different from original COS semantic, transactions from the same\n" + "client is also treated as lock conflict.", + /* id */ HFILL + } + }, + { /* p_id */ &hf_lustre_ldlm_fl_deny_on_contention, /* hfinfo */ { /* name */ "LDLM_FL_DENY_ON_CONTENTION", diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 5da2544..e831975 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -159,6 +159,8 @@ struct lu_target { struct dt_object *lut_reply_data; /** Bitmap of used slots in the reply data file */ unsigned long **lut_reply_bitmap; + /** target sync count, used for debug & test */ + atomic_t lut_sync_count; }; /* number of slots in reply bitmap */ @@ -425,6 +427,8 @@ int tgt_hpreq_handler(struct ptlrpc_request *req); /* target/tgt_main.c */ void tgt_boot_epoch_update(struct lu_target *lut); +void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno); +void tgt_discard_slc_lock(struct ldlm_lock *lock); int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut, struct obd_export *exp, __u64 transno); int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 354b6aa..b0fa434 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1032,6 +1032,11 @@ static inline void lustre_handle_copy(struct lustre_handle *tgt, tgt->cookie = src->cookie; } +struct lustre_handle_array { + unsigned int count; + struct lustre_handle handles[0]; +}; + /* flags for lm_flags */ #define MSGHDR_AT_SUPPORT 0x1 #define MSGHDR_CKSUM_INCOMPAT18 0x2 diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 89a029a..61728ea 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -900,6 +900,13 @@ struct ldlm_lock { struct list_head l_exp_list; }; +/** For uncommitted cross-MDT lock, store transno this lock belongs to */ +#define l_transno l_client_cookie + +/** For uncommitted cross-MDT lock, which is client lock, share with l_rk_ast + * which is for server. */ +#define l_slc_link l_rk_ast + /** * LDLM resource description. * Basically, resource is a representation for a single object. diff --git a/lustre/include/lustre_dlm_flags.h b/lustre/include/lustre_dlm_flags.h index 5f7206d..9b7037e 100644 --- a/lustre/include/lustre_dlm_flags.h +++ b/lustre/include/lustre_dlm_flags.h @@ -129,6 +129,15 @@ #define ldlm_set_cancel_on_block(_l) LDLM_SET_FLAG(( _l), 1ULL << 23) #define ldlm_clear_cancel_on_block(_l) LDLM_CLEAR_FLAG((_l), 1ULL << 23) +/** Flag whether a lock is enqueued from a distributed transaction, and the + * requesting lock mode is PW/EX, if so, it will check compatibility with COS + * locks, and different from original COS semantic, transactions from the same + * client is also treated as lock conflict. */ +#define LDLM_FL_COS_INCOMPAT 0x0000000001000000ULL /* bit 24 */ +#define ldlm_is_cos_incompat(_l) LDLM_TEST_FLAG((_l), 1ULL << 24) +#define ldlm_set_cos_incompat(_l) LDLM_SET_FLAG((_l), 1ULL << 24) +#define ldlm_clear_cos_incompat(_l) LDLM_CLEAR_FLAG((_l), 1ULL << 24) + /** * measure lock contention and return -EUSERS if locking contention is high */ #define LDLM_FL_DENY_ON_CONTENTION 0x0000000040000000ULL // bit 30 @@ -345,6 +354,14 @@ /** Flag whether a lock is found on server for re-sent RPC. */ #define LDLM_FL_RESENT 0x0100000000000000ULL // bit 56 +/** Flag whether Commit-on-Sharing is enabled, if LDLM_FL_COS_INCOMPAT is set + * this flag may not be set because once the former is set this flag won't be + * checked, and for cross-MDT lock COS_INCOMPAT is always set but ast handle is + * in ldlm context which doesn't know whether COS is enabled or not. */ +#define LDLM_FL_COS_ENABLED 0x0200000000000000ULL /* bit 57 */ +#define ldlm_is_cos_enabled(_l) LDLM_TEST_FLAG((_l), 1ULL << 57) +#define ldlm_set_cos_enabled(_l) LDLM_SET_FLAG((_l), 1ULL << 57) + /** l_flags bits marked as "ast" bits */ #define LDLM_FL_AST_MASK (LDLM_FL_FLOCK_DEADLOCK |\ LDLM_FL_AST_DISCARD_DATA) diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 0c31481..74119fb 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -56,6 +56,7 @@ #include #include #include +#include #include "ldlm_internal.h" @@ -81,7 +82,6 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, { struct list_head *tmp; struct ldlm_lock *lock; - enum ldlm_mode req_mode = req->l_req_mode; __u64 req_bits = req->l_policy_data.l_inodebits.bits; int compat = 1; ENTRY; @@ -98,23 +98,33 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* We stop walking the queue if we hit ourselves so we don't * take conflicting locks enqueued after us into account, * or we'd wait forever. */ - if (req == lock) - RETURN(compat); + if (req == lock) + RETURN(compat); - /* last lock in mode group */ - LASSERT(lock->l_sl_mode.prev != NULL); + /* last lock in mode group */ + LASSERT(lock->l_sl_mode.prev != NULL); mode_tail = &list_entry(lock->l_sl_mode.prev, - struct ldlm_lock, - l_sl_mode)->l_res_link; + struct ldlm_lock, + l_sl_mode)->l_res_link; - /* locks are compatible, bits don't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - /* jump to last lock in mode group */ - tmp = mode_tail; - continue; - } + /* if reqest lock is not COS_INCOMPAT and COS is disabled, + * they are compatible, IOW this request is from a local + * transaction on a DNE system. */ + if (lock->l_req_mode == LCK_COS && !ldlm_is_cos_incompat(req) && + !ldlm_is_cos_enabled(req)) { + /* jump to last lock in mode group */ + tmp = mode_tail; + continue; + } - for (;;) { + /* locks' mode are compatible, bits don't matter */ + if (lockmode_compat(lock->l_req_mode, req->l_req_mode)) { + /* jump to last lock in mode group */ + tmp = mode_tail; + continue; + } + + for (;;) { struct list_head *head; /* Advance loop cursor to last lock in policy group. */ @@ -128,6 +138,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, * requirement: it is only compatible with * locks from the same client. */ if (lock->l_req_mode == LCK_COS && + !ldlm_is_cos_incompat(req) && + ldlm_is_cos_enabled(req) && lock->l_client_cookie == req->l_client_cookie) goto not_conflicting; /* Found a conflicting policy group. */ @@ -206,8 +218,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, ldlm_grant_lock(lock, work_list); *err = ELDLM_OK; - RETURN(LDLM_ITER_CONTINUE); - } + RETURN(LDLM_ITER_CONTINUE); + } restart: rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 484f0b4..90580b0 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1799,6 +1799,10 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, ldlm_set_ast_discard_data(lock); if (*flags & LDLM_FL_TEST_LOCK) ldlm_set_test_lock(lock); + if (*flags & LDLM_FL_COS_INCOMPAT) + ldlm_set_cos_incompat(lock); + if (*flags & LDLM_FL_COS_ENABLED) + ldlm_set_cos_enabled(lock); /* This distinction between local lock trees is very important; a client * namespace only has information about locks taken by that client, and diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 5060265..a0586af 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -903,6 +903,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); + ldlm_set_cbpending(lock); if (instant_cancel) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 5118151..ccf79ec 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -3890,11 +3890,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, return dt_object_sync(env, dt_object_child(dt), start, end); } -struct lod_slave_locks { - int lsl_lock_count; - struct lustre_handle lsl_handle[0]; -}; - /** * Release LDLM locks on the stripes of a striped directory. * @@ -3914,7 +3909,7 @@ static int lod_object_unlock_internal(const struct lu_env *env, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy) { - struct lod_slave_locks *slave_locks = einfo->ei_cbdata; + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; int rc = 0; int i; ENTRY; @@ -3922,9 +3917,9 @@ static int lod_object_unlock_internal(const struct lu_env *env, if (slave_locks == NULL) RETURN(0); - for (i = 1; i < slave_locks->lsl_lock_count; i++) { - if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) - ldlm_lock_decref(&slave_locks->lsl_handle[i], + for (i = 1; i < slave_locks->count; i++) { + if (lustre_handle_is_used(&slave_locks->handles[i])) + ldlm_lock_decref(&slave_locks->handles[i], einfo->ei_mode); } @@ -3943,32 +3938,31 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_slave_locks *slave_locks = einfo->ei_cbdata; - int slave_locks_size; - int rc; + struct lod_object *lo = lod_dt_obj(dt); + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; + int slave_locks_size; + int i; ENTRY; if (slave_locks == NULL) RETURN(0); - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); - + LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); + LASSERT(lo->ldo_stripenr > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ - if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt))) - RETURN(0); + LASSERT(!dt_object_remote(dt_object_child(dt))); - /* Only cancel slave lock for striped dir */ - rc = lod_object_unlock_internal(env, dt, einfo, policy); + /* locks were unlocked in MDT layer */ + for (i = 1; i < slave_locks->count; i++) + LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); - slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count * - sizeof(slave_locks->lsl_handle[0]); + slave_locks_size = sizeof(*slave_locks) + slave_locks->count * + sizeof(slave_locks->handles[0]); OBD_FREE(slave_locks, slave_locks_size); einfo->ei_cbdata = NULL; - RETURN(rc); + RETURN(0); } /** @@ -3989,7 +3983,7 @@ static int lod_object_lock(const struct lu_env *env, int rc = 0; int i; int slave_locks_size; - struct lod_slave_locks *slave_locks = NULL; + struct lustre_handle_array *slave_locks = NULL; ENTRY; /* remote object lock */ @@ -4011,12 +4005,12 @@ static int lod_object_lock(const struct lu_env *env, RETURN(0); slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * - sizeof(slave_locks->lsl_handle[0]); + sizeof(slave_locks->handles[0]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); if (slave_locks == NULL) RETURN(-ENOMEM); - slave_locks->lsl_lock_count = lo->ldo_stripenr; + slave_locks->count = lo->ldo_stripenr; /* striped directory lock */ for (i = 1; i < lo->ldo_stripenr; i++) { @@ -4038,6 +4032,10 @@ static int lod_object_lock(const struct lu_env *env, ldlm_completion_callback completion = einfo->ei_cb_cp; __u64 dlmflags = LDLM_FL_ATOMIC_CB; + if (einfo->ei_mode == LCK_PW || + einfo->ei_mode == LCK_EX) + dlmflags |= LDLM_FL_COS_INCOMPAT; + /* This only happens if there are mulitple stripes * on the master MDT, i.e. except stripe0, there are * other stripes on the Master MDT as well, Only @@ -4052,7 +4050,7 @@ static int lod_object_lock(const struct lu_env *env, } if (rc != 0) GOTO(out, rc); - slave_locks->lsl_handle[i] = lockh; + slave_locks->handles[i] = lockh; } einfo->ei_cbdata = slave_locks; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 18ab7b6..3035202 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -170,6 +170,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, const struct lu_name *lname) { lh->mlh_reg_mode = lock_mode; + lh->mlh_pdo_mode = LCK_MINMODE; lh->mlh_rreg_mode = lock_mode; lh->mlh_type = MDT_PDO_LOCK; @@ -2155,11 +2156,14 @@ static void mdt_device_commit_async(const struct lu_env *env, { struct dt_device *dt = mdt->mdt_bottom; int rc; + ENTRY; rc = dt->dd_ops->dt_commit_async(env, dt); if (unlikely(rc != 0)) CWARN("%s: async commit start failed: rc = %d\n", mdt_obd_name(mdt), rc); + atomic_inc(&mdt->mdt_async_commit_count); + EXIT; } /** @@ -2215,17 +2219,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, if (flag == LDLM_CB_CANCELING) RETURN(0); + lock_res_and_lock(lock); if (lock->l_blocking_ast != mdt_blocking_ast) { unlock_res_and_lock(lock); RETURN(0); } - if (mdt_cos_is_enabled(mdt) && - lock->l_req_mode & (LCK_PW | LCK_EX) && - lock->l_blocking_lock != NULL && - lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) { - mdt_set_lock_sync(lock); - } + if (lock->l_req_mode & (LCK_PW | LCK_EX) && + lock->l_blocking_lock != NULL) { + if (mdt_cos_is_enabled(mdt) && + lock->l_client_cookie != + lock->l_blocking_lock->l_client_cookie) + mdt_set_lock_sync(lock); + else if (mdt_slc_is_enabled(mdt) && + ldlm_is_cos_incompat(lock->l_blocking_lock)) + mdt_set_lock_sync(lock); + } rc = ldlm_blocking_ast_nocheck(lock); /* There is no lock conflict if l_blocking_lock == NULL, @@ -2246,12 +2255,24 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } -/* Used for cross-MDT lock */ +/* + * Blocking AST for cross-MDT lock + * + * Discard lock from uncommitted_slc_locks and cancel it. + * + * \param lock the lock which blocks a request or cancelling lock + * \param desc unused + * \param data unused + * \param flag indicates whether this cancelling or blocking callback + * \retval 0 on success + * \retval negative number on error + */ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { struct lustre_handle lockh; int rc; + ENTRY; switch (flag) { case LDLM_CB_BLOCKING: @@ -2265,10 +2286,14 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, break; case LDLM_CB_CANCELING: LDLM_DEBUG(lock, "Revoke remote lock\n"); + /* discard slc lock here so that it can be cleaned anytime, + * especially for cleanup_resource() */ + tgt_discard_slc_lock(lock); break; default: LBUG(); } + RETURN(0); } @@ -2342,12 +2367,12 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, static int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits, - bool nonblock) + bool nonblock, bool cos_incompat) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags; + __u64 dlmflags = 0; int rc; ENTRY; @@ -2356,6 +2381,14 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, LASSERT(lh->mlh_reg_mode != LCK_MINMODE); LASSERT(lh->mlh_type != MDT_NUL_LOCK); + if (cos_incompat) { + LASSERT(lh->mlh_reg_mode == LCK_PW || + lh->mlh_reg_mode == LCK_EX); + dlmflags |= LDLM_FL_COS_INCOMPAT; + } else if (mdt_cos_is_enabled(info->mti_mdt)) { + dlmflags |= LDLM_FL_COS_ENABLED; + } + /* Only enqueue LOOKUP lock for remote object */ if (mdt_object_remote(o)) LASSERT(ibits == MDS_INODELOCK_LOOKUP); @@ -2375,7 +2408,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, memset(policy, 0, sizeof(*policy)); fid_build_reg_res_name(mdt_object_fid(o), res_id); - dlmflags = LDLM_FL_ATOMIC_CB; + dlmflags |= LDLM_FL_ATOMIC_CB; if (nonblock) dlmflags |= LDLM_FL_BLOCK_NOWAIT; @@ -2432,15 +2465,18 @@ out_unlock: static int mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits, - bool nonblock) + struct mdt_lock_handle *lh, __u64 ibits, bool nonblock, + bool cos_incompat) { struct mdt_lock_handle *local_lh = NULL; int rc; ENTRY; - if (!mdt_object_remote(o)) - return mdt_object_local_lock(info, o, lh, ibits, nonblock); + if (!mdt_object_remote(o)) { + rc = mdt_object_local_lock(info, o, lh, ibits, nonblock, + cos_incompat); + RETURN(rc); + } /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */ ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT | @@ -2448,9 +2484,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, /* Only enqueue LOOKUP lock for remote object */ if (ibits & MDS_INODELOCK_LOOKUP) { - rc = mdt_object_local_lock(info, o, lh, - MDS_INODELOCK_LOOKUP, - nonblock); + rc = mdt_object_local_lock(info, o, lh, MDS_INODELOCK_LOOKUP, + nonblock, cos_incompat); if (rc != ELDLM_OK) RETURN(rc); @@ -2488,7 +2523,16 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits) { - return mdt_object_lock_internal(info, o, lh, ibits, false); + return mdt_object_lock_internal(info, o, lh, ibits, false, false); +} + +int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, + bool cos_incompat) +{ + LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX); + return mdt_object_lock_internal(info, o, lh, ibits, false, + cos_incompat); } int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, @@ -2497,7 +2541,22 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle tmp = *lh; int rc; - rc = mdt_object_lock_internal(info, o, &tmp, ibits, true); + rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, false); + if (rc == 0) + *lh = tmp; + + return rc == 0; +} + +int mdt_reint_object_lock_try(struct mdt_thread_info *info, + struct mdt_object *o, struct mdt_lock_handle *lh, + __u64 ibits, bool cos_incompat) +{ + struct mdt_lock_handle tmp = *lh; + int rc; + + LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX); + rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, cos_incompat); if (rc == 0) *lh = tmp; @@ -2530,24 +2589,27 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, struct mdt_device *mdt = info->mti_mdt; struct ldlm_lock *lock = ldlm_handle2lock(h); struct ptlrpc_request *req = mdt_info_req(info); - int no_ack = 0; + int cos; + + cos = (mdt_cos_is_enabled(mdt) || + mdt_slc_is_enabled(mdt)); LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n", h->cookie); + /* there is no request if mdt_object_unlock() is called * from mdt_export_cleanup()->mdt_add_dirty_flag() */ if (likely(req != NULL)) { CDEBUG(D_HA, "request = %p reply state = %p" " transno = "LPD64"\n", req, req->rq_reply_state, req->rq_transno); - if (mdt_cos_is_enabled(mdt)) { - no_ack = 1; + if (cos) { ldlm_lock_downgrade(lock, LCK_COS); mode = LCK_COS; } - ptlrpc_save_lock(req, h, mode, no_ack); + ptlrpc_save_lock(req, h, mode, cos); } else { - ldlm_lock_decref(h, mode); + mdt_fid_unlock(h, mode); } if (mdt_is_lock_sync(lock)) { CDEBUG(D_HA, "found sync-lock," @@ -2564,6 +2626,41 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, } /** + * Save cross-MDT lock in uncommitted_slc_locks + * + * Keep the lock referenced until transaction commit happens or release the lock + * immediately depending on input parameters. + * + * \param info thead info object + * \param h lock handle + * \param mode lock mode + * \param decref force immediate lock releasing + */ +static void mdt_save_remote_lock(struct mdt_thread_info *info, + struct lustre_handle *h, enum ldlm_mode mode, + int decref) +{ + ENTRY; + + if (lustre_handle_is_used(h)) { + if (decref || !info->mti_has_trans || + !(mode & (LCK_PW | LCK_EX))) { + ldlm_lock_decref_and_cancel(h, mode); + } else { + struct ldlm_lock *lock = ldlm_handle2lock(h); + struct ptlrpc_request *req = mdt_info_req(info); + + LASSERT(req != NULL); + tgt_save_slc_lock(lock, req->rq_transno); + ldlm_lock_decref(h, mode); + } + h->cookie = 0ull; + } + + EXIT; +} + +/** * Unlock mdt object. * * Immeditely release the regular lock and the PDO lock or save the @@ -2576,17 +2673,15 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, * \param decref force immediate lock releasing */ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, int decref) + struct mdt_lock_handle *lh, int decref) { - ENTRY; - - mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); - mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); + ENTRY; - if (lustre_handle_is_used(&lh->mlh_rreg_lh)) - ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode); + mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); + mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); + mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref); - EXIT; + EXIT; } struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info, @@ -4375,6 +4470,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_enable_remote_dir = 0; m->mdt_enable_remote_dir_gid = 0; + atomic_set(&m->mdt_mds_mds_conns, 0); + atomic_set(&m->mdt_async_commit_count, 0); + m->mdt_lu_dev.ld_ops = &mdt_lu_ops; m->mdt_lu_dev.ld_obd = obd; /* Set this lu_device to obd for error handling purposes. */ @@ -5004,6 +5102,18 @@ static int mdt_export_cleanup(struct obd_export *exp) RETURN(rc); } +static inline void mdt_enable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) + mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL; +} + +static inline void mdt_disable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL) + mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; +} + static int mdt_obd_disconnect(struct obd_export *exp) { int rc; @@ -5012,6 +5122,14 @@ static int mdt_obd_disconnect(struct obd_export *exp) LASSERT(exp); class_export_get(exp); + if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) && + !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) { + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + + if (atomic_dec_and_test(&mdt->mdt_mds_mds_conns)) + mdt_disable_slc(mdt); + } + rc = server_disconnect_export(exp); if (rc != 0) CDEBUG(D_IOCTL, "server disconnect error: rc = %d\n", rc); @@ -5042,6 +5160,12 @@ static int mdt_obd_connect(const struct lu_env *env, mdt = mdt_dev(obd->obd_lu_dev); + if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && + !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { + atomic_inc(&mdt->mdt_mds_mds_conns); + mdt_enable_slc(mdt); + } + /* * first, check whether the stack is ready to handle requests * XXX: probably not very appropriate method is used now diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 178868f..95c8a24 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -216,6 +216,12 @@ struct mdt_device { struct lu_device *mdt_qmt_dev; struct coordinator mdt_coordinator; + + /* inter-MDT connection count */ + atomic_t mdt_mds_mds_conns; + + /* MDT device async commit count, used for debug and sanity test */ + atomic_t mdt_async_commit_count; }; #define MDT_SERVICE_WATCHDOG_FACTOR (2) @@ -586,9 +592,17 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, struct mdt_object *mo, int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *mo, struct mdt_lock_handle *lh, __u64 ibits); +int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, + bool cos_incompat); + int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo, struct mdt_lock_handle *lh, __u64 ibits); +int mdt_reint_object_lock_try(struct mdt_thread_info *info, + struct mdt_object *o, struct mdt_lock_handle *lh, + __u64 ibits, bool cos_incompat); + void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo, struct mdt_lock_handle *lh, int decref); @@ -965,6 +979,11 @@ static inline void mdt_fid_unlock(struct lustre_handle *lh, enum ldlm_mode mode) ldlm_lock_decref(lh, mode); } +static inline bool mdt_slc_is_enabled(struct mdt_device *mdt) +{ + return mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL; +} + extern mdl_mode_t mdt_mdl_lock_modes[]; extern enum ldlm_mode mdt_dlm_lock_modes[]; diff --git a/lustre/mdt/mdt_lproc.c b/lustre/mdt/mdt_lproc.c index 9f6ca37..58728e8 100644 --- a/lustre/mdt/mdt_lproc.c +++ b/lustre/mdt/mdt_lproc.c @@ -677,6 +677,100 @@ mdt_enable_remote_dir_gid_seq_write(struct file *file, } LPROC_SEQ_FOPS(mdt_enable_remote_dir_gid); +/** + * Show MDT policy for handling dirty metadata under a lock being cancelled. + * + * \param[in] m seq_file handle + * \param[in] data unused for single entry + * + * \retval 0 on success + * \retval negative value on error + */ +static int mdt_slc_seq_show(struct seq_file *m, void *data) +{ + struct obd_device *obd = m->private; + struct lu_target *tgt = obd->u.obt.obt_lut; + char *slc_states[] = {"never", "blocking", "always" }; + + return seq_printf(m, "%s\n", slc_states[tgt->lut_sync_lock_cancel]); +} +LPROC_SEQ_FOPS_RO(mdt_slc); + +/** + * Show MDT async commit count. + * + * \param[in] m seq_file handle + * \param[in] data unused for single entry + * + * \retval 0 on success + * \retval negative value on error + */ +static int mdt_async_commit_count_seq_show(struct seq_file *m, void *data) +{ + struct obd_device *obd = m->private; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + + return seq_printf(m, "%d\n", atomic_read(&mdt->mdt_async_commit_count)); +} + +static ssize_t +mdt_async_commit_count_seq_write(struct file *file, const char __user *buffer, + size_t count, loff_t *off) +{ + struct seq_file *m = file->private_data; + struct obd_device *obd = m->private; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + atomic_set(&mdt->mdt_async_commit_count, val); + + return count; +} +LPROC_SEQ_FOPS(mdt_async_commit_count); + +/** + * Show MDT sync count. + * + * \param[in] m seq_file handle + * \param[in] data unused for single entry + * + * \retval 0 on success + * \retval negative value on error + */ +static int mdt_sync_count_seq_show(struct seq_file *m, void *data) +{ + struct obd_device *obd = m->private; + struct lu_target *tgt = obd->u.obt.obt_lut; + + return seq_printf(m, "%d\n", atomic_read(&tgt->lut_sync_count)); +} + +static ssize_t +mdt_sync_count_seq_write(struct file *file, const char __user *buffer, + size_t count, loff_t *off) +{ + struct seq_file *m = file->private_data; + struct obd_device *obd = m->private; + struct lu_target *tgt = obd->u.obt.obt_lut; + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + atomic_set(&tgt->lut_sync_count, val); + + return count; +} +LPROC_SEQ_FOPS(mdt_sync_count); + + LPROC_SEQ_FOPS_RO_TYPE(mdt, uuid); LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status); LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports); @@ -740,6 +834,12 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = { .fops = &mdt_recovery_time_hard_fops }, { .name = "recovery_time_soft", .fops = &mdt_recovery_time_soft_fops }, + { .name = "sync_lock_cancel", + .fops = &mdt_slc_fops }, + { .name = "async_commit_count", + .fops = &mdt_async_commit_count_fops }, + { .name = "sync_count", + .fops = &mdt_sync_count_fops }, { NULL } }; diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index d18019b..5aac290 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -292,131 +292,16 @@ static int mdt_remote_permission(struct mdt_thread_info *info) return 0; } -/* - * VBR: we save three versions in reply: - * 0 - parent. Check that parent version is the same during replay. - * 1 - name. Version of 'name' if file exists with the same name or - * ENOENT_VERSION, it is needed because file may appear due to missed replays. - * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity - * check. - */ -static int mdt_md_create(struct mdt_thread_info *info) -{ - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *parent; - struct mdt_object *child; - struct mdt_lock_handle *lh; - struct mdt_body *repbody; - struct md_attr *ma = &info->mti_attr; - struct mdt_reint_record *rr = &info->mti_rr; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, mdt_info_req(info), "Create ("DNAME"->"DFID") " - "in "DFID, - PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1)); - - if (!fid_is_md_operative(rr->rr_fid1)) - RETURN(-EPERM); - - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - - parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(parent)) - RETURN(PTR_ERR(parent)); - - if (!mdt_object_exists(parent)) - GOTO(put_parent, rc = -ENOENT); - - lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); - if (rc) - GOTO(put_parent, rc); - - if (!mdt_object_remote(parent)) { - rc = mdt_version_get_check_save(info, parent, 0); - if (rc) - GOTO(unlock_parent, rc); - } - - /* - * Check child name version during replay. - * During create replay a file may exist with same name. - */ - rc = mdt_lookup_version_check(info, parent, &rr->rr_name, - &info->mti_tmp_fid1, 1); - if (rc == 0) - GOTO(unlock_parent, rc = -EEXIST); - - /* -ENOENT is expected here */ - if (rc != -ENOENT) - GOTO(unlock_parent, rc); - - /* save version of file name for replay, it must be ENOENT here */ - mdt_enoent_version_save(info, 1); - - child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2); - if (likely(!IS_ERR(child))) { - struct md_object *next = mdt_object_child(parent); - - rc = mdt_remote_permission(info); - if (rc != 0) - GOTO(out_put_child, rc); - - ma->ma_need = MA_INODE; - ma->ma_valid = 0; - - mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, - OBD_FAIL_MDS_REINT_CREATE_WRITE); - - /* Version of child will be updated on disk. */ - tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); - rc = mdt_version_get_check_save(info, child, 2); - if (rc) - GOTO(out_put_child, rc); - - /* Let lower layer know current lock mode. */ - info->mti_spec.sp_cr_mode = - mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); - - /* - * Do not perform lookup sanity check. We know that name does - * not exist. - */ - info->mti_spec.sp_cr_lookup = 0; - info->mti_spec.sp_feat = &dt_directory_features; - - rc = mdo_create(info->mti_env, next, &rr->rr_name, - mdt_object_child(child), &info->mti_spec, ma); - if (rc == 0) - rc = mdt_attr_get_complex(info, child, ma); - - if (rc == 0) { - /* Return fid & attr to client. */ - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(info, repbody, &ma->ma_attr, - mdt_object_fid(child)); - } -out_put_child: - mdt_object_put(info->mti_env, child); - } else { - rc = PTR_ERR(child); - } -unlock_parent: - mdt_object_unlock(info, parent, lh, rc); -put_parent: - mdt_object_put(info->mti_env, parent); - RETURN(rc); -} - static int mdt_unlock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, __u64 ibits, struct mdt_lock_handle *s0_lh, struct mdt_object *s0_obj, - struct ldlm_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + int decref) { union ldlm_policy_data *policy = &mti->mti_policy; + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; + int i; int rc; ENTRY; @@ -426,31 +311,33 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, /* Unlock stripe 0 */ if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) { LASSERT(s0_obj != NULL); - mdt_object_unlock_put(mti, s0_obj, s0_lh, 1); + mdt_object_unlock_put(mti, s0_obj, s0_lh, decref); } memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; + if (slave_locks != NULL) { + LASSERT(s0_lh != NULL); + for (i = 1; i < slave_locks->count; i++) { + /* borrow s0_lh temporarily to do mdt unlock */ + mdt_lock_reg_init(s0_lh, einfo->ei_mode); + s0_lh->mlh_rreg_lh = slave_locks->handles[i]; + mdt_object_unlock(mti, obj, s0_lh, decref); + slave_locks->handles[i].cookie = 0ull; + } + } + rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo, policy); RETURN(rc); } -/** - * Lock slave stripes if necessary, the lock handles of slave stripes - * will be stored in einfo->ei_cbdata. - **/ -static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, - enum ldlm_mode mode, __u64 ibits, - struct mdt_lock_handle *s0_lh, - struct mdt_object **s0_objp, - struct ldlm_enqueue_info *einfo) +static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, + struct lu_fid *fid) { - union ldlm_policy_data *policy = &mti->mti_policy; struct lu_buf *buf = &mti->mti_buf; struct lmv_mds_md_v1 *lmv; - struct lu_fid *fid = &mti->mti_tmp_fid1; int rc; ENTRY; @@ -480,15 +367,46 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(-EINVAL); fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]); - if (!lu_fid_eq(fid, mdt_object_fid(obj))) { + + RETURN(rc); +} + +/** + * Lock slave stripes if necessary, the lock handles of slave stripes + * will be stored in einfo->ei_cbdata. + **/ +static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, + enum ldlm_mode mode, __u64 ibits, + struct lu_fid *s0_fid, + struct mdt_lock_handle *s0_lh, + struct mdt_object **s0_objp, + struct ldlm_enqueue_info *einfo) +{ + union ldlm_policy_data *policy = &mti->mti_policy; + int rc; + ENTRY; + + rc = mdt_init_slaves(mti, obj, s0_fid); + if (rc <= 0) + RETURN(rc); + + LASSERT(S_ISDIR(obj->mot_header.loh_attr)); + + if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) { /* Except migrating object, whose 0_stripe and master * object are the same object, 0_stripe and master * object are different, though they are in the same * MDT, to avoid adding osd_object_lock here, so we * will enqueue the stripe0 lock in MDT0 for now */ - *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits); + *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid); if (IS_ERR(*s0_objp)) RETURN(PTR_ERR(*s0_objp)); + + rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true); + if (rc < 0) { + mdt_object_put(mti->mti_env, *s0_objp); + RETURN(rc); + } } memset(einfo, 0, sizeof(*einfo)); @@ -507,6 +425,174 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(rc); } +/* + * VBR: we save three versions in reply: + * 0 - parent. Check that parent version is the same during replay. + * 1 - name. Version of 'name' if file exists with the same name or + * ENOENT_VERSION, it is needed because file may appear due to missed replays. + * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity + * check. + */ +static int mdt_md_create(struct mdt_thread_info *info) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *parent; + struct mdt_object *child; + struct mdt_lock_handle *lh; + struct mdt_body *repbody; + struct md_attr *ma = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + int rc; + ENTRY; + + DEBUG_REQ(D_INODE, mdt_info_req(info), "Create ("DNAME"->"DFID") " + "in "DFID, + PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1)); + + if (!fid_is_md_operative(rr->rr_fid1)) + RETURN(-EPERM); + + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + + parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + if (!mdt_object_exists(parent)) + GOTO(put_parent, rc = -ENOENT); + + lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); + rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); + if (rc) + GOTO(put_parent, rc); + + if (!mdt_object_remote(parent)) { + rc = mdt_version_get_check_save(info, parent, 0); + if (rc) + GOTO(unlock_parent, rc); + } + + /* + * Check child name version during replay. + * During create replay a file may exist with same name. + */ + rc = mdt_lookup_version_check(info, parent, &rr->rr_name, + &info->mti_tmp_fid1, 1); + if (rc == 0) + GOTO(unlock_parent, rc = -EEXIST); + + /* -ENOENT is expected here */ + if (rc != -ENOENT) + GOTO(unlock_parent, rc); + + /* save version of file name for replay, it must be ENOENT here */ + mdt_enoent_version_save(info, 1); + + child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2); + if (unlikely(IS_ERR(child))) + GOTO(unlock_parent, rc = PTR_ERR(child)); + + rc = mdt_remote_permission(info); + if (rc != 0) + GOTO(put_child, rc); + + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + + mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, + OBD_FAIL_MDS_REINT_CREATE_WRITE); + + /* Version of child will be updated on disk. */ + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); + rc = mdt_version_get_check_save(info, child, 2); + if (rc) + GOTO(put_child, rc); + + /* Let lower layer know current lock mode. */ + info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); + + /* + * Do not perform lookup sanity check. We know that name does + * not exist. + */ + info->mti_spec.sp_cr_lookup = 0; + info->mti_spec.sp_feat = &dt_directory_features; + + rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name, + mdt_object_child(child), &info->mti_spec, ma); + if (rc == 0) + rc = mdt_attr_get_complex(info, child, ma); + + if (rc < 0) + GOTO(put_child, rc); + + /* + * On DNE, we need to eliminate dependey between 'mkdir a' and + * 'mkdir a/b' if b is a striped directory, to achieve this, two + * things are done below: + * 1. save child and slaves lock. + * 2. if the child is a striped directory, relock parent so to + * compare against with COS locks to ensure parent was + * committed to disk. + */ + if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) { + struct mdt_lock_handle *lhc; + struct mdt_lock_handle *s0_lh; + struct mdt_object *s0_obj = NULL; + struct ldlm_enqueue_info *einfo; + struct lu_fid *s0_fid = &info->mti_tmp_fid1; + bool cos_incompat = false; + + rc = mdt_init_slaves(info, child, s0_fid); + if (rc > 0) { + cos_incompat = true; + if (!mdt_object_remote(parent)) { + mdt_object_unlock(info, parent, lh, 1); + mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); + rc = mdt_reint_object_lock(info, parent, lh, + MDS_INODELOCK_UPDATE, + true); + if (rc) + GOTO(put_child, rc); + } + } + + einfo = &info->mti_einfo; + lhc = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, LCK_PW); + rc = mdt_reint_object_lock(info, child, lhc, + MDS_INODELOCK_UPDATE, + cos_incompat); + if (rc) + GOTO(put_child, rc); + mdt_object_unlock(info, child, lhc, rc); + + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_handle_init(s0_lh); + mdt_lock_reg_init(s0_lh, LCK_PW); + rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE, + s0_fid, s0_lh, &s0_obj, einfo); + mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh, + s0_obj, einfo, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO) + rc = 0; + } + + /* Return fid & attr to client. */ + if (ma->ma_valid & MA_INODE) + mdt_pack_attr2body(info, repbody, &ma->ma_attr, + mdt_object_fid(child)); +put_child: + mdt_object_put(info->mti_env, child); +unlock_parent: + mdt_object_unlock(info, parent, lh, rc); +put_parent: + mdt_object_put(info->mti_env, parent); + RETURN(rc); +} + static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, struct md_attr *ma) { @@ -514,11 +600,17 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); __u64 lockpart = MDS_INODELOCK_UPDATE; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - struct mdt_lock_handle *s0_lh; - struct mdt_object *s0_obj = NULL; + struct lu_fid *s0_fid = &info->mti_tmp_fid1; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; + bool cos_incompat = false; int rc; ENTRY; + rc = mdt_init_slaves(info, mo, s0_fid); + if (rc > 0) + cos_incompat = true; + lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); @@ -529,13 +621,14 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - rc = mdt_object_lock(info, mo, lh, lockpart); + rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat); if (rc != 0) RETURN(rc); s0_lh = &info->mti_lh[MDT_LH_LOCAL]; mdt_lock_reg_init(s0_lh, LCK_PW); - rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo); + rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj, + einfo); if (rc != 0) GOTO(out_unlock, rc); @@ -571,7 +664,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, EXIT; out_unlock: - mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo); + mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc); mdt_object_unlock(info, mo, lh, rc); return rc; } @@ -776,22 +869,24 @@ static int mdt_reint_create(struct mdt_thread_info *info, * Version of child is getting and checking during its lookup. If */ static int mdt_reint_unlink(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct mdt_object *mp; - struct mdt_object *mc; - struct mdt_lock_handle *parent_lh; - struct mdt_lock_handle *child_lh; + struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct mdt_object *mp; + struct mdt_object *mc; + struct mdt_lock_handle *parent_lh; + struct mdt_lock_handle *child_lh; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - struct mdt_lock_handle *s0_lh = NULL; - struct mdt_object *s0_obj = NULL; - __u64 lock_ibits; - int rc; - int no_name = 0; + struct lu_fid *s0_fid = &info->mti_tmp_fid2; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; + __u64 lock_ibits; + bool cos_incompat = false; + int no_name = 0; + int rc; ENTRY; DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), @@ -800,34 +895,22 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) - RETURN(err_serious(-ENOENT)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) + RETURN(err_serious(-ENOENT)); if (!fid_is_md_operative(rr->rr_fid1)) RETURN(-EPERM); - /* - * step 1: Found the parent. - */ mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(mp)) { - rc = PTR_ERR(mp); - GOTO(out, rc); - } - - parent_lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE); - if (rc != 0) - GOTO(put_parent, rc); + if (IS_ERR(mp)) + RETURN(PTR_ERR(mp)); if (!mdt_object_remote(mp)) { rc = mdt_version_get_check_save(info, mp, 0); if (rc) - GOTO(unlock_parent, rc); + GOTO(put_parent, rc); } - /* step 2: find & lock the child */ /* lookup child object along with version checking */ fid_zero(child_fid); rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1); @@ -852,35 +935,45 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, no_name = 1; *child_fid = *rr->rr_fid2; } else { - GOTO(unlock_parent, rc); + GOTO(put_parent, rc); } } if (!fid_is_md_operative(child_fid)) - GOTO(unlock_parent, rc = -EPERM); + GOTO(put_parent, rc = -EPERM); /* We will lock the child regardless it is local or remote. No harm. */ mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); if (IS_ERR(mc)) - GOTO(unlock_parent, rc = PTR_ERR(mc)); + GOTO(put_parent, rc = PTR_ERR(mc)); + + rc = mdt_init_slaves(info, mc, s0_fid); + cos_incompat = (mdt_object_remote(mp) || (rc > 0)); + + parent_lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name); + rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE, + cos_incompat); + if (rc != 0) + GOTO(put_child, rc); - child_lh = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(child_lh, LCK_EX); + child_lh = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(child_lh, LCK_EX); if (info->mti_spec.sp_rm_entry) { struct lu_ucred *uc = mdt_ucred(info); if (!mdt_is_dne_client(req->rq_export)) /* Return -ENOTSUPP for old client */ - GOTO(put_child, rc = -ENOTSUPP); + GOTO(unlock_parent, rc = -ENOTSUPP); if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) - GOTO(put_child, rc = -EPERM); + GOTO(unlock_parent, rc = -EPERM); ma->ma_need = MA_INODE; ma->ma_valid = 0; rc = mdo_unlink(info->mti_env, mdt_object_child(mp), NULL, &rr->rr_name, ma, no_name); - GOTO(put_child, rc); + GOTO(unlock_parent, rc); } if (mdt_object_remote(mc)) { @@ -890,7 +983,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n", mdt_obd_name(info->mti_mdt), PNAME(&rr->rr_name), PFID(mdt_object_fid(mc))); - GOTO(put_child, rc = -ENOENT); + GOTO(unlock_parent, rc = -ENOENT); } CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n", mdt_obd_name(info->mti_mdt), @@ -898,7 +991,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (!mdt_is_dne_client(req->rq_export)) /* Return -ENOTSUPP for old client */ - GOTO(put_child, rc = -ENOTSUPP); + GOTO(unlock_parent, rc = -ENOTSUPP); /* Revoke the LOOKUP lock of the remote object granted by * this MDT. Since the unlink will happen on another MDT, @@ -923,14 +1016,16 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, child_lh->mlh_rreg_mode, MDS_INODELOCK_LOOKUP, false); if (rc != ELDLM_OK) - GOTO(put_child, rc); + GOTO(unlock_parent, rc); lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mc, child_lh, lock_ibits); + rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits, + cos_incompat); if (rc != 0) - GOTO(put_child, rc); + GOTO(unlock_child, rc); + /* * Now we can only make sure we need MA_INODE, in mdd layer, will check * whether need MA_LOV and MA_COOKIE. @@ -940,8 +1035,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, s0_lh = &info->mti_lh[MDT_LH_LOCAL]; mdt_lock_reg_init(s0_lh, LCK_EX); - rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh, - &s0_obj, einfo); + rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid, + s0_lh, &s0_obj, einfo); if (rc != 0) GOTO(unlock_child, rc); @@ -984,15 +1079,15 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, EXIT; unlock_child: - mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo); + mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo, + rc); mdt_object_unlock(info, mc, child_lh, rc); -put_child: - mdt_object_put(info->mti_env, mc); unlock_parent: mdt_object_unlock(info, mp, parent_lh, rc); +put_child: + mdt_object_put(info->mti_env, mc); put_parent: mdt_object_put(info->mti_env, mp); -out: return rc; } @@ -1001,109 +1096,117 @@ out: * name. */ static int mdt_reint_link(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct mdt_object *ms; - struct mdt_object *mp; - struct mdt_lock_handle *lhs; - struct mdt_lock_handle *lhp; - int rc; - ENTRY; + struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct mdt_object *ms; + struct mdt_object *mp; + struct mdt_lock_handle *lhs; + struct mdt_lock_handle *lhp; + bool cos_incompat; + int rc; + ENTRY; DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME, PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) - RETURN(err_serious(-ENOENT)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) + RETURN(err_serious(-ENOENT)); if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - /* Invalid case so return error immediately instead of - * processing it */ - if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) - RETURN(-EPERM); + /* Invalid case so return error immediately instead of + * processing it */ + if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) + RETURN(-EPERM); if (!fid_is_md_operative(rr->rr_fid1) || !fid_is_md_operative(rr->rr_fid2)) RETURN(-EPERM); - /* step 1: find & lock the target parent dir */ - lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name); - mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, - MDS_INODELOCK_UPDATE); - if (IS_ERR(mp)) - RETURN(PTR_ERR(mp)); - - rc = mdt_version_get_check_save(info, mp, 0); - if (rc) - GOTO(out_unlock_parent, rc); - - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + /* step 1: find target parent dir */ + mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2); + if (IS_ERR(mp)) + RETURN(PTR_ERR(mp)); - /* step 2: find & lock the source */ - lhs = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(lhs, LCK_EX); + rc = mdt_version_get_check_save(info, mp, 0); + if (rc) + GOTO(put_parent, rc); - ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(ms)) - GOTO(out_unlock_parent, rc = PTR_ERR(ms)); + /* step 2: find source */ + ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(ms)) + GOTO(put_parent, rc = PTR_ERR(ms)); if (!mdt_object_exists(ms)) { - mdt_object_put(info->mti_env, ms); CDEBUG(D_INFO, "%s: "DFID" does not exist.\n", mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1)); - GOTO(out_unlock_parent, rc = -ENOENT); + GOTO(put_source, rc = -ENOENT); } - rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR); - if (rc != 0) { - mdt_object_put(info->mti_env, ms); - GOTO(out_unlock_parent, rc); - } + cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms)); - /* step 3: link it */ - mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, - OBD_FAIL_MDS_REINT_LINK_WRITE); + lhp = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name); + rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE, + cos_incompat); + if (rc != 0) + GOTO(put_source, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + + lhs = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(lhs, LCK_EX); + rc = mdt_reint_object_lock(info, ms, lhs, + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + cos_incompat); + if (rc != 0) + GOTO(unlock_parent, rc); + + /* step 3: link it */ + mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, + OBD_FAIL_MDS_REINT_LINK_WRITE); tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms)); - rc = mdt_version_get_check_save(info, ms, 1); - if (rc) - GOTO(out_unlock_child, rc); + rc = mdt_version_get_check_save(info, ms, 1); + if (rc) + GOTO(unlock_source, rc); - /** check target version by name during replay */ + /** check target version by name during replay */ rc = mdt_lookup_version_check(info, mp, &rr->rr_name, &info->mti_tmp_fid1, 2); - if (rc != 0 && rc != -ENOENT) - GOTO(out_unlock_child, rc); - /* save version of file name for replay, it must be ENOENT here */ - if (!req_is_replay(mdt_info_req(info))) { + if (rc != 0 && rc != -ENOENT) + GOTO(unlock_source, rc); + /* save version of file name for replay, it must be ENOENT here */ + if (!req_is_replay(mdt_info_req(info))) { if (rc != -ENOENT) { CDEBUG(D_INFO, "link target "DNAME" existed!\n", PNAME(&rr->rr_name)); - GOTO(out_unlock_child, rc = -EEXIST); + GOTO(unlock_source, rc = -EEXIST); } - info->mti_ver[2] = ENOENT_VERSION; - mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2); - } + info->mti_ver[2] = ENOENT_VERSION; + mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2); + } rc = mdo_link(info->mti_env, mdt_object_child(mp), - mdt_object_child(ms), &rr->rr_name, ma); + mdt_object_child(ms), &rr->rr_name, ma); - if (rc == 0) + if (rc == 0) mdt_counter_incr(req, LPROC_MDT_LINK); - EXIT; -out_unlock_child: - mdt_object_unlock_put(info, ms, lhs, rc); -out_unlock_parent: - mdt_object_unlock_put(info, mp, lhp, rc); - return rc; + EXIT; +unlock_source: + mdt_object_unlock(info, ms, lhs, rc); +unlock_parent: + mdt_object_unlock(info, mp, lhp, rc); +put_source: + mdt_object_put(info->mti_env, ms); +put_parent: + mdt_object_put(info->mti_env, mp); + return rc; } /** * lock the part of the directory according to the hash of the name @@ -1111,11 +1214,13 @@ out_unlock_parent: */ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, struct mdt_lock_handle *lh, - struct mdt_object *obj, __u64 ibits) + struct mdt_object *obj, __u64 ibits, + bool cos_incompat) { struct ldlm_res_id *res = &info->mti_res_id; struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; + __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; int rc; /* @@ -1126,14 +1231,16 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res); memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; + if (cos_incompat && + (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) + dlmflags |= LDLM_FL_COS_INCOMPAT; /* * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, - &info->mti_exp->exp_handle.h_cookie); + res, dlmflags, &info->mti_exp->exp_handle.h_cookie); return rc; } @@ -1335,8 +1442,8 @@ again: * cannot be gotten because of conflicting locks, then drop all * current locks, send an AST to the client, and start again. */ mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name); - rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, - MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock_try(info, mdt_pobj, &mll->mll_lh, + MDS_INODELOCK_UPDATE, true); if (rc == 0) { mdt_unlock_list(info, lock_list, rc); @@ -1426,8 +1533,8 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, lh_dirp = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, msrcdir, lh_dirp, - MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, msrcdir, lh_dirp, MDS_INODELOCK_UPDATE, + true); if (rc) GOTO(out_put_parent, rc); @@ -1547,7 +1654,7 @@ out_lease: lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_childp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_childp, lock_ibits, true); if (rc != 0) GOTO(out_unlock_child, rc); @@ -1645,7 +1752,10 @@ out_put_new: out_unlock_child: mdt_object_unlock(info, mold, lh_childp, rc); out_unlock_list: - mdt_unlock_list(info, &lock_list, rc); + /* we don't really modify linkea objects, so we can safely decref these + * locks, and this can avoid saving them as COS locks, which may prevent + * subsequent migrate. */ + mdt_unlock_list(info, &lock_list, 1); if (lease != NULL) { ldlm_reprocess_all(lease->l_resource); LDLM_LOCK_PUT(lease); @@ -1689,12 +1799,13 @@ out_put: static int mdt_object_lock_save(struct mdt_thread_info *info, struct mdt_object *dir, struct mdt_lock_handle *lh, - int idx) + int idx, bool cos_incompat) { int rc; /* we lock the target dir if it is local */ - rc = mdt_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE, + cos_incompat); if (rc != 0) return rc; @@ -1703,146 +1814,125 @@ static int mdt_object_lock_save(struct mdt_thread_info *info, return 0; } - -static int mdt_rename_parents_lock(struct mdt_thread_info *info, - struct mdt_object **srcp, - struct mdt_object **tgtp) +/* + * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent; + * 2 - srcdir child; 3 - tgtdir child. + * Update on disk version of srcdir child. + */ +/** + * For DNE phase I, only these renames are allowed + * mv src_p/src_c tgt_p/tgt_c + * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT. + * 2. src_p and tgt_p are same directory, and tgt_c does not + * exists. In this case, all of modification will happen + * in the MDT where ithesource parent is, only one remote + * update is needed, i.e. set c_time/m_time on the child. + * And tgt_c will be still in the same MDT as the original + * src_c. + */ +static int mdt_reint_rename_internal(struct mdt_thread_info *info, + struct mdt_lock_handle *lhc) { struct mdt_reint_record *rr = &info->mti_rr; - const struct lu_fid *fid_src = rr->rr_fid1; - const struct lu_fid *fid_tgt = rr->rr_fid2; - struct mdt_lock_handle *lh_src = &info->mti_lh[MDT_LH_PARENT]; - struct mdt_lock_handle *lh_tgt = &info->mti_lh[MDT_LH_CHILD]; - struct mdt_object *src; - struct mdt_object *tgt; - int reverse = 0; - int rc; + struct md_attr *ma = &info->mti_attr; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *msrcdir = NULL; + struct mdt_object *mtgtdir = NULL; + struct mdt_object *mold; + struct mdt_object *mnew = NULL; + struct mdt_lock_handle *lh_srcdirp; + struct mdt_lock_handle *lh_tgtdirp; + struct mdt_lock_handle *lh_oldp = NULL; + struct mdt_lock_handle *lh_newp = NULL; + struct lu_fid *old_fid = &info->mti_tmp_fid1; + struct lu_fid *new_fid = &info->mti_tmp_fid2; + __u64 lock_ibits; + bool reverse = false; + bool cos_incompat; + int rc; ENTRY; + DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME, + PFID(rr->rr_fid1), PNAME(&rr->rr_name), + PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); + /* find both parents. */ - src = mdt_object_find_check(info, fid_src, 0); - if (IS_ERR(src)) - RETURN(PTR_ERR(src)); + msrcdir = mdt_object_find_check(info, rr->rr_fid1, 0); + if (IS_ERR(msrcdir)) + RETURN(PTR_ERR(msrcdir)); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); - if (lu_fid_eq(fid_src, fid_tgt)) { - tgt = src; - mdt_object_get(info->mti_env, tgt); + if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { + mtgtdir = msrcdir; + mdt_object_get(info->mti_env, mtgtdir); } else { - /* Check if the @src is not a child of the @tgt, otherwise a - * reverse locking must take place. */ - rc = mdt_is_subdir(info, src, fid_tgt); + /* Check if the @msrcdir is not a child of the @mtgtdir, + * otherwise a reverse locking must take place. */ + rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2); if (rc == -EINVAL) - reverse = 1; + reverse = true; else if (rc) - GOTO(err_src_put, rc); + GOTO(out_put_srcdir, rc); - tgt = mdt_object_find_check(info, fid_tgt, 1); - if (IS_ERR(tgt)) - GOTO(err_src_put, rc = PTR_ERR(tgt)); + mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1); + if (IS_ERR(mtgtdir)) + GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir)); } + /* source needs to be looked up after locking source parent, otherwise + * this rename may race with unlink source, and cause rename hang, see + * sanityn.sh 55b, so check parents first, if later we found source is + * remote, relock parents. */ + cos_incompat = (mdt_object_remote(msrcdir) || + mdt_object_remote(mtgtdir)); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); /* lock parents in the proper order. */ + lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; + lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; + +relock: + mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); + mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); + if (reverse) { - rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); + rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1, + cos_incompat); if (rc) - GOTO(err_tgt_put, rc); + GOTO(out_put_tgtdir, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); - rc = mdt_object_lock_save(info, src, lh_src, 0); + rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0, + cos_incompat); } else { - rc = mdt_object_lock_save(info, src, lh_src, 0); + rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0, + cos_incompat); if (rc) - GOTO(err_tgt_put, rc); + GOTO(out_put_tgtdir, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); - if (tgt != src) - rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); - else if (lh_src->mlh_pdo_hash != lh_tgt->mlh_pdo_hash) { - rc = mdt_pdir_hash_lock(info, lh_tgt, tgt, - MDS_INODELOCK_UPDATE); + if (mtgtdir != msrcdir) { + rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1, + cos_incompat); + } else if (lh_srcdirp->mlh_pdo_hash != + lh_tgtdirp->mlh_pdo_hash) { + rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir, + MDS_INODELOCK_UPDATE, + cos_incompat); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10); } } if (rc) - GOTO(err_unlock, rc); + GOTO(out_unlock_parents, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); - - *srcp = src; - *tgtp = tgt; - RETURN(0); - -err_unlock: - /* The order does not matter as the handle is checked inside, - * as well as not used handle. */ - mdt_object_unlock(info, src, lh_src, rc); - mdt_object_unlock(info, tgt, lh_tgt, rc); -err_tgt_put: - mdt_object_put(info->mti_env, tgt); -err_src_put: - mdt_object_put(info->mti_env, src); - RETURN(rc); -} - -/* - * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent; - * 2 - src child; 3 - tgt child. - * Update on disk version of src child. - */ -/** - * For DNE phase I, only these renames are allowed - * mv src_p/src_c tgt_p/tgt_c - * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT. - * 2. src_p and tgt_p are same directory, and tgt_c does not - * exists. In this case, all of modification will happen - * in the MDT where ithesource parent is, only one remote - * update is needed, i.e. set c_time/m_time on the child. - * And tgt_c will be still in the same MDT as the original - * src_c. - */ -static int mdt_reint_rename_internal(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) -{ - struct mdt_reint_record *rr = &info->mti_rr; - struct md_attr *ma = &info->mti_attr; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_object *msrcdir = NULL; - struct mdt_object *mtgtdir = NULL; - struct mdt_object *mold; - struct mdt_object *mnew = NULL; - struct mdt_lock_handle *lh_srcdirp; - struct mdt_lock_handle *lh_tgtdirp; - struct mdt_lock_handle *lh_oldp = NULL; - struct mdt_lock_handle *lh_newp = NULL; - struct lu_fid *old_fid = &info->mti_tmp_fid1; - struct lu_fid *new_fid = &info->mti_tmp_fid2; - __u64 lock_ibits; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME, - PFID(rr->rr_fid1), PNAME(&rr->rr_name), - PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); - - lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); - lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); - - /* step 1&2: lock the source and target dirs. */ - rc = mdt_rename_parents_lock(info, &msrcdir, &mtgtdir); - if (rc) - RETURN(rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5); - /* step 3: find & lock the old object. */ + /* find mold object. */ fid_zero(old_fid); rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2); if (rc != 0) @@ -1868,9 +1958,17 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, /* save version after locking */ mdt_version_get_save(info, mold, 2); - /* step 4: find & lock the new object. */ - /* new target object may not exist now */ - /* lookup with version checking */ + if (!cos_incompat && mdt_object_remote(mold)) { + cos_incompat = true; + mdt_object_put(info->mti_env, mold); + mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN); + mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN); + goto relock; + } + + /* find mnew object: + * mnew target object may not exist now + * lookup with version checking */ fid_zero(new_fid); rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid, 3); @@ -1911,7 +2009,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); - lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR; if (mdt_object_remote(msrcdir)) { /* Enqueue lookup lock from the parent MDT */ @@ -1927,7 +2024,8 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits, + cos_incompat); if (rc != 0) GOTO(out_unlock_old, rc); @@ -1944,9 +2042,10 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lh_newp = &info->mti_lh[MDT_LH_NEW]; mdt_lock_reg_init(lh_newp, LCK_EX); - rc = mdt_object_lock(info, mnew, lh_newp, - MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, mnew, lh_newp, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE, + cos_incompat); if (rc != 0) GOTO(out_unlock_old, rc); @@ -1957,7 +2056,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, } else { lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); - lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR; if (mdt_object_remote(msrcdir)) { /* Enqueue lookup lock from the parent MDT */ @@ -1968,14 +2066,15 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, MDS_INODELOCK_LOOKUP, false); if (rc != ELDLM_OK) - GOTO(out_put_new, rc); + GOTO(out_put_old, rc); lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits, + cos_incompat); if (rc != 0) - GOTO(out_put_old, rc); + GOTO(out_unlock_old, rc); mdt_enoent_version_save(info, 3); } @@ -2018,8 +2117,12 @@ out_put_new: out_put_old: mdt_object_put(info->mti_env, mold); out_unlock_parents: - mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc); - mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc); + mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc); + mdt_object_unlock(info, msrcdir, lh_srcdirp, rc); +out_put_tgtdir: + mdt_object_put(info->mti_env, mtgtdir); +out_put_srcdir: + mdt_object_put(info->mti_env, msrcdir); return rc; } diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 26996f6..65efd68 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -904,6 +904,8 @@ static int osp_md_object_lock(const struct lu_env *env, if (einfo->ei_nonblock) flags |= LDLM_FL_BLOCK_NOWAIT; + if (einfo->ei_mode & (LCK_EX | LCK_PW)) + flags |= LDLM_FL_COS_INCOMPAT; req = ldlm_enqueue_pack(osp->opd_exp, 0); if (IS_ERR(req)) @@ -923,13 +925,6 @@ static int osp_md_object_lock(const struct lu_env *env, &flags, NULL, 0, LVB_T_NONE, lh, 0); ptlrpc_req_finished(req); - if (rc == ELDLM_OK) { - struct ldlm_lock *lock; - - lock = __ldlm_handle2lock(lh, 0); - ldlm_set_cbpending(lock); - LDLM_LOCK_PUT(lock); - } return rc == ELDLM_OK ? 0 : -EIO; } diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index f7a1171..ac96517 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -1284,6 +1284,7 @@ int tgt_sync(const struct lu_env *env, struct lu_target *tgt, tgt->lut_obd->obd_last_committed) { rc = dt_object_sync(env, obj, start, end); } + atomic_inc(&tgt->lut_sync_count); RETURN(rc); } @@ -1292,14 +1293,27 @@ EXPORT_SYMBOL(tgt_sync); * Unified target DLM handlers. */ -/* Ensure that data and metadata are synced to the disk when lock is cancelled - * (if requested) */ +/** + * Unified target BAST + * + * Ensure data and metadata are synced to disk when lock is canceled if Sync on + * Cancel (SOC) is enabled. If it's extent lock, normally sync obj is enough, + * but if it's cross-MDT lock, because remote object version is not set, a + * filesystem sync is needed. + * + * \param lock server side lock + * \param desc lock desc + * \param data ldlm_cb_set_arg + * \param flag indicates whether this cancelling or blocking callback + * \retval 0 on success + * \retval negative number on error + */ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { struct lu_env env; struct lu_target *tgt; - struct dt_object *obj; + struct dt_object *obj = NULL; struct lu_fid fid; int rc = 0; @@ -1314,10 +1328,12 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } if (flag == LDLM_CB_CANCELING && - (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) && + (lock->l_granted_mode & (LCK_EX | LCK_PW | LCK_GROUP)) && (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL || (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL && - lock->l_flags & LDLM_FL_CBPENDING))) { + ldlm_is_cbpending(lock))) && + ((exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) || + lock->l_resource->lr_type == LDLM_EXTENT)) { __u64 start = 0; __u64 end = OBD_OBJECT_EOF; @@ -1327,14 +1343,15 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, ost_fid_from_resid(&fid, &lock->l_resource->lr_name, tgt->lut_lsd.lsd_osd_index); - obj = dt_locate(&env, tgt->lut_bottom, &fid); - if (IS_ERR(obj)) - GOTO(err_env, rc = PTR_ERR(obj)); - - if (!dt_object_exists(obj)) - GOTO(err_put, rc = -ENOENT); if (lock->l_resource->lr_type == LDLM_EXTENT) { + obj = dt_locate(&env, tgt->lut_bottom, &fid); + if (IS_ERR(obj)) + GOTO(err_env, rc = PTR_ERR(obj)); + + if (!dt_object_exists(obj)) + GOTO(err_put, rc = -ENOENT); + start = lock->l_policy_data.l_extent.start; end = lock->l_policy_data.l_extent.end; } @@ -1348,7 +1365,8 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, lock->l_policy_data.l_extent.end, rc); } err_put: - lu_object_put(&env, &obj->do_lu); + if (obj != NULL) + lu_object_put(&env, &obj->do_lu); err_env: lu_env_fini(&env); } diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index 824a9c7..8d2fc37 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -287,4 +287,6 @@ int sub_thandle_trans_create(const struct lu_env *env, void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new); int top_trans_create_tmt(const struct lu_env *env, struct top_thandle *top_th); + +void tgt_cancel_slc_locks(__u64 transno); #endif /* _TG_INTERNAL_H */ diff --git a/lustre/target/tgt_lastrcvd.c b/lustre/target/tgt_lastrcvd.c index 1c8f206..bf44695 100644 --- a/lustre/target/tgt_lastrcvd.c +++ b/lustre/target/tgt_lastrcvd.c @@ -781,7 +781,9 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th, if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) { ccb->llcc_exp->exp_last_committed = ccb->llcc_transno; spin_unlock(&ccb->llcc_tgt->lut_translock); + ptlrpc_commit_replies(ccb->llcc_exp); + tgt_cancel_slc_locks(ccb->llcc_transno); } else { spin_unlock(&ccb->llcc_tgt->lut_translock); } diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index 0f0fcd3..391f295 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -37,6 +37,113 @@ #include "tgt_internal.h" #include "../ptlrpc/ptlrpc_internal.h" +static spinlock_t uncommitted_slc_locks_guard; +static struct list_head uncommitted_slc_locks; + +/* + * Save cross-MDT lock in uncommitted_slc_locks. + * + * Lock R/W count is not saved, but released in unlock (not canceled remotely), + * instead only a refcount is taken, so that the remote MDT where the object + * resides can detect conflict with this lock there. + * + * \param lock cross-MDT lock to save + * \param transno when the transaction with this transno is committed, this lock + * can be canceled. + */ +void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno) +{ + spin_lock(&uncommitted_slc_locks_guard); + lock_res_and_lock(lock); + if (ldlm_is_cbpending(lock)) { + /* if it was canceld by server, don't save, because remote MDT + * will do Sync-on-Cancel. */ + LDLM_LOCK_PUT(lock); + } else { + lock->l_transno = transno; + /* if this lock is in the list already, there are two operations + * both use this lock, and save it after use, so for the second + * one, just put the refcount. */ + if (list_empty(&lock->l_slc_link)) + list_add_tail(&lock->l_slc_link, + &uncommitted_slc_locks); + else + LDLM_LOCK_PUT(lock); + } + unlock_res_and_lock(lock); + spin_unlock(&uncommitted_slc_locks_guard); +} +EXPORT_SYMBOL(tgt_save_slc_lock); + +/* + * Discard cross-MDT lock from uncommitted_slc_locks. + * + * This is called upon BAST, just remove lock from uncommitted_slc_locks and put + * lock refcount. The BAST will cancel this lock. + * + * \param lock cross-MDT lock to discard + */ +void tgt_discard_slc_lock(struct ldlm_lock *lock) +{ + spin_lock(&uncommitted_slc_locks_guard); + lock_res_and_lock(lock); + /* may race with tgt_cancel_slc_locks() */ + if (lock->l_transno != 0) { + LASSERT(!list_empty(&lock->l_slc_link)); + LASSERT(ldlm_is_cbpending(lock)); + list_del_init(&lock->l_slc_link); + lock->l_transno = 0; + LDLM_LOCK_PUT(lock); + } + unlock_res_and_lock(lock); + spin_unlock(&uncommitted_slc_locks_guard); +} +EXPORT_SYMBOL(tgt_discard_slc_lock); + +/* + * Cancel cross-MDT locks upon transaction commit. + * + * Remove cross-MDT locks from uncommitted_slc_locks, cancel them and put lock + * refcount. + * + * \param transno transaction with this number was committed. + */ +void tgt_cancel_slc_locks(__u64 transno) +{ + struct ldlm_lock *lock, *next; + LIST_HEAD(list); + struct lustre_handle lockh; + + spin_lock(&uncommitted_slc_locks_guard); + list_for_each_entry_safe(lock, next, &uncommitted_slc_locks, + l_slc_link) { + lock_res_and_lock(lock); + LASSERT(lock->l_transno != 0); + if (lock->l_transno > transno) { + unlock_res_and_lock(lock); + continue; + } + /* ouch, another operation is using it after it's saved */ + if (lock->l_readers != 0 || lock->l_writers != 0) { + unlock_res_and_lock(lock); + continue; + } + /* set CBPENDING so that this lock won't be used again */ + ldlm_set_cbpending(lock); + lock->l_transno = 0; + list_move(&lock->l_slc_link, &list); + unlock_res_and_lock(lock); + } + spin_unlock(&uncommitted_slc_locks_guard); + + list_for_each_entry_safe(lock, next, &list, l_slc_link) { + list_del_init(&lock->l_slc_link); + ldlm_lock2handle(lock, &lockh); + ldlm_cli_cancel(&lockh, LCF_ASYNC); + LDLM_LOCK_PUT(lock); + } +} + int tgt_init(const struct lu_env *env, struct lu_target *lut, struct obd_device *obd, struct dt_device *dt, struct tgt_opc_slice *slice, int request_fail_id, @@ -146,6 +253,8 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, if (rc < 0) GOTO(out, rc); + atomic_set(&lut->lut_sync_count, 0); + RETURN(0); out: @@ -295,6 +404,9 @@ int tgt_mod_init(void) update_info_init(); + spin_lock_init(&uncommitted_slc_locks_guard); + INIT_LIST_HEAD(&uncommitted_slc_locks); + RETURN(0); } diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index 5fe683c..7a6d88e 100755 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -4753,6 +4753,59 @@ test_70d() { } run_test 70d "stop MDT1, mkdir succeed, create remote dir fail" +test_70e() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + + [ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.7.62) ] || + { skip "Need MDS version at least 2.7.62"; return 0; } + + cleanup || error "cleanup failed with $?" + + local mdsdev=$(mdsdevname 1) + local ostdev=$(ostdevname 1) + local mdsvdev=$(mdsvdevname 1) + local ostvdev=$(ostvdevname 1) + local opts_mds="$(mkfs_opts mds1 $mdsdev) --reformat $mdsdev $mdsvdev" + local opts_ost="$(mkfs_opts ost1 $ostdev) --reformat $ostdev $ostvdev" + + add mds1 $opts_mds || error "add mds1 failed" + start_mdt 1 || error "start mdt1 failed" + add ost1 $opts_ost || error "add ost1 failed" + start_ost || error "start ost failed" + mount_client $MOUNT > /dev/null || error "mount client $MOUNT failed" + + local soc=$(do_facet mds1 "$LCTL get_param -n \ + mdt.*MDT0000.sync_lock_cancel") + [ $soc == "never" ] || error "SoC enabled on single MDS" + + for i in $(seq 2 $MDSCOUNT); do + mdsdev=$(mdsdevname $i) + mdsvdev=$(mdsvdevname $i) + opts_mds="$(mkfs_opts mds$i $mdsdev) --reformat $mdsdev \ + $mdsvdev" + add mds$i $opts_mds || error "add mds$i failed" + start_mdt $i || error "start mdt$i fail" + done + + wait_dne_interconnect + + for i in $(seq $MDSCOUNT); do + soc=$(do_facet mds$i "$LCTL get_param -n \ + mdt.*MDT000$((i - 1)).sync_lock_cancel") + [ $soc == "blocking" ] || error "SoC not enabled on DNE" + done + + for i in $(seq 2 $MDSCOUNT); do + stop_mdt $i || error "stop mdt$i fail" + done + soc=$(do_facet mds1 "$LCTL get_param -n \ + mdt.*MDT0000.sync_lock_cancel") + [ $soc == "never" ] || error "SoC enabled on single MDS" + + cleanup || error "cleanup failed with $?" +} +run_test 70e "Sync-on-Cancel will be enabled by default on DNE" + test_71a() { [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return if combined_mgs_mds; then diff --git a/lustre/tests/sanityn.sh b/lustre/tests/sanityn.sh index dbaffb0..5bb7c8a 100644 --- a/lustre/tests/sanityn.sh +++ b/lustre/tests/sanityn.sh @@ -925,7 +925,7 @@ test_33b() { avgjbd=0 avgtime=0 for i in 1 2 3; do - do_node $CLIENT1 "$LFS mkdir -i $MDTIDX -p \ + do_node $CLIENT1 "$LFS mkdir -i $MDTIDX \ $DIR1/$tdir-\\\$(hostname)-$i" jbdold=$(print_jbd_stat) @@ -958,6 +958,118 @@ test_33b() { } run_test 33b "COS: cross create/delete, 2 clients, benchmark under remote dir" +test_33c() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] && + skip "DNE CoS not supported" && return + + sync + + mkdir $DIR/$tdir + # remote mkdir is done on MDT2, which enqueued lock of $tdir on MDT1 + $LFS mkdir -i 1 $DIR/$tdir/d1 + do_facet mds1 "lctl set_param -n mdt.*.sync_count=0" + mkdir $DIR/$tdir/d2 + local sync_count=$(do_facet mds1 \ + "lctl get_param -n mdt.*MDT0000.sync_count") + [ $sync_count -eq 1 ] || error "Sync-Lock-Cancel not triggered" + + $LFS mkdir -i 1 $DIR/$tdir/d3 + do_facet mds1 "lctl set_param -n mdt.*.sync_count=0" + # during sleep remote mkdir should have been committed and canceled + # remote lock spontaneously, which shouldn't trigger sync + sleep 6 + mkdir $DIR/$tdir/d4 + local sync_count=$(do_facet mds1 \ + "lctl get_param -n mdt.*MDT0000.sync_count") + [ $sync_count -eq 0 ] || error "Sync-Lock-Cancel triggered" +} +run_test 33c "Cancel cross-MDT lock should trigger Sync-Lock-Cancel" + +ops_do_cos() { + local nodes=$(comma_list $(mdts_nodes)) + do_nodes $nodes "lctl set_param -n mdt.*.async_commit_count=0" + sh -c "$@" + local async_commit_count=$(do_nodes $nodes \ + "lctl get_param -n mdt.*.async_commit_count" | calc_sum) + [ $async_commit_count -gt 0 ] || error "CoS not triggerred" + + rm -rf $DIR/$tdir + sync +} + +test_33d() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] && + skip "DNE CoS not supported" && return + + sync + # remote directory create + mkdir $DIR/$tdir + ops_do_cos "$LFS mkdir -i 1 $DIR/$tdir/subdir" + # remote directory unlink + $LFS mkdir -i 1 $DIR/$tdir + ops_do_cos "rmdir $DIR/$tdir" + # striped directory create + mkdir $DIR/$tdir + ops_do_cos "$LFS mkdir -c 2 $DIR/$tdir/subdir" + # striped directory setattr + $LFS mkdir -c 2 $DIR/$tdir + touch $DIR/$tdir + ops_do_cos "chmod 713 $DIR/$tdir" + # striped directory unlink + $LFS mkdir -c 2 $DIR/$tdir + touch $DIR/$tdir + ops_do_cos "rmdir $DIR/$tdir" + # cross-MDT link + $LFS mkdir -c 2 $DIR/$tdir + $LFS mkdir -i 0 $DIR/$tdir/d1 + $LFS mkdir -i 1 $DIR/$tdir/d2 + touch $DIR/$tdir/d1/tgt + ops_do_cos "ln $DIR/$tdir/d1/tgt $DIR/$tdir/d2/src" + # cross-MDT rename + $LFS mkdir -c 2 $DIR/$tdir + $LFS mkdir -i 0 $DIR/$tdir/d1 + $LFS mkdir -i 1 $DIR/$tdir/d2 + touch $DIR/$tdir/d1/src + ops_do_cos "mv $DIR/$tdir/d1/src $DIR/$tdir/d2/tgt" + # migrate + $LFS mkdir -i 0 $DIR/$tdir + ops_do_cos "$LFS migrate -m 1 $DIR/$tdir" + return 0 +} +run_test 33d "DNE distributed operation should trigger COS" + +test_33e() { + [ -n "$CLIENTS" ] || { skip "Need two or more clients" && return 0; } + [ $CLIENTCOUNT -ge 2 ] || + { skip "Need two or more clients, have $CLIENTCOUNT" && + return 0; } + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] && + skip "DNE CoS not supported" && return + + local client2=${CLIENT2:-$(hostname)} + + sync + + local nodes=$(comma_list $(mdts_nodes)) + do_nodes $nodes "lctl set_param -n mdt.*.async_commit_count=0" + + $LFS mkdir -c 2 $DIR/$tdir + mkdir $DIR/$tdir/subdir + echo abc > $DIR/$tdir/$tfile + do_node $client2 echo dfg >> $DIR/$tdir/$tfile + do_node $client2 touch $DIR/$tdir/subdir + + local async_commit_count=$(do_nodes $nodes \ + "lctl get_param -n mdt.*.async_commit_count" | calc_sum) + [ $async_commit_count -gt 0 ] && error "CoS triggerred" + + return 0 +} +run_test 33e "DNE local operation shouldn't trigger COS" + # End commit on sharing tests get_ost_lock_timeouts() { @@ -3434,6 +3546,11 @@ run_test 91 "chmod and unlink striped directory" log "cleanup: ======================================================" +# kill and wait in each test only guarentee script finish, but command in script +# like 'rm' 'chmod' may still be running, wait for all commands to finish +# otherwise umount below will fail +wait_update $HOSTNAME "fuser -m $MOUNT2" "" || true + [ "$(mount | grep $MOUNT2)" ] && umount $MOUNT2 complete $SECONDS diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index f7feed1..2e5f9bd 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -3017,6 +3017,7 @@ static int cb_migrate_mdt_init(char *path, DIR *parent, DIR **dirp, int fd; int ret; char *filename; + bool retry = false; LASSERT(parent != NULL || dirp != NULL); if (dirp != NULL) @@ -3047,8 +3048,19 @@ static int cb_migrate_mdt_init(char *path, DIR *parent, DIR **dirp, goto out; } +migrate: ret = ioctl(fd, LL_IOC_MIGRATE, rawbuf); if (ret != 0) { + if (errno == EBUSY && !retry) { + /* because migrate may not be able to lock all involved + * objects in order, for some of them it try lock, while + * there may be conflicting COS locks and cause migrate + * fail with EBUSY, hope a sync() could cause + * transaction commit and release these COS locks. */ + sync(); + retry = true; + goto migrate; + } ret = -errno; fprintf(stderr, "%s migrate failed: %s (%d)\n", path, strerror(-ret), ret); -- 1.8.3.1