From 5e9b1828ee5a9cb92b85005c0a34962bee56d137 Mon Sep 17 00:00:00 2001 From: zam Date: Thu, 6 Nov 2008 18:11:14 +0000 Subject: [PATCH] Branch HEAD b=15393 i=alex.zhuravlev@sun.com i=tappro@sun.com Commit on sharing. Eliminate inter-client dependencies between uncommitted transactions by doing transaction commits. Thereby clients may recovery independently. --- lnet/lnet/lib-msg.c | 3 + lustre/ChangeLog | 6 ++ lustre/include/dt_object.h | 10 ++ lustre/include/lustre/lustre_idl.h | 3 +- lustre/include/lustre_dlm.h | 7 ++ lustre/include/lustre_net.h | 5 +- lustre/include/obd_support.h | 1 + lustre/ldlm/ldlm_inodebits.c | 17 ++- lustre/ldlm/ldlm_lib.c | 25 ++++- lustre/ldlm/ldlm_lock.c | 39 +++++-- lustre/ldlm/ldlm_lockd.c | 3 + lustre/ldlm/ldlm_request.c | 70 ++++++++---- lustre/lvfs/lvfs_linux.c | 1 - lustre/mdt/mdt_handler.c | 216 ++++++++++++++++++++++++++++++++++--- lustre/mdt/mdt_internal.h | 17 ++- lustre/mdt/mdt_lproc.c | 24 +++++ lustre/mdt/mdt_recovery.c | 5 +- lustre/mdt/mdt_reint.c | 4 +- lustre/mgs/mgs_handler.c | 2 +- lustre/obdclass/obd_config.c | 8 +- lustre/obdecho/echo.c | 2 +- lustre/obdfilter/filter.c | 2 +- lustre/osd/osd_handler.c | 23 ++++ lustre/ost/ost_handler.c | 8 +- lustre/ptlrpc/events.c | 4 +- lustre/ptlrpc/niobuf.c | 3 +- lustre/ptlrpc/service.c | 25 ++++- lustre/ptlrpc/wiretest.c | 4 +- lustre/tests/replay-dual.sh | 26 +++++ lustre/tests/test-framework.sh | 10 ++ lustre/utils/wiretest.c | 4 +- 31 files changed, 499 insertions(+), 78 deletions(-) diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 28aea30..68286b3 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -45,6 +45,8 @@ void lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) { + ENTRY; + memset(ev, 0, sizeof(*ev)); ev->status = 0; @@ -52,6 +54,7 @@ lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) ev->type = LNET_EVENT_UNLINK; lnet_md_deconstruct(md, &ev->md); lnet_md2handle(&ev->md_handle, md); + EXIT; } void diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 65d8ae7..bd543fd 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -13,6 +13,12 @@ tbd Sun Microsystems, Inc. removed cwd "./" (refer to Bugzilla 14399). * File join has been disabled in this release, refer to Bugzilla 16929. +Severity : enhancement +Bugzilla : 15393 +Description: Commit on sharing. Eliminate inter-client dependencies between + uncommitted transactions by doing transaction commits. + Thereby clients may recovery independently. + Severity : normal Frequency : Create a symlink file with a very long name Bugzilla : 16578 diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 760cee5..b1fa210 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -128,6 +128,16 @@ struct dt_device_operations { int (*dt_sync)(const struct lu_env *env, struct dt_device *dev); void (*dt_ro)(const struct lu_env *env, struct dt_device *dev); /** + * Start a transaction commit asynchronously + * + * \param env environment + * \param dev dt_device to start commit on + * + * \return 0 success, negative value if error + */ + int (*dt_commit_async)(const struct lu_env *env, + struct dt_device *dev); + /** * Initialize capability context. */ int (*dt_init_capa_ctxt)(const struct lu_env *env, diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 02c19d8..c676d4d 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1728,10 +1728,11 @@ typedef enum { LCK_CR = 16, LCK_NL = 32, LCK_GROUP = 64, + LCK_COS = 128, LCK_MAXMODE } ldlm_mode_t; -#define LCK_MODE_NUM 7 +#define LCK_MODE_NUM 8 typedef enum { LDLM_PLAIN = 10, diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 368ee21..1bd1c53 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -201,6 +201,7 @@ typedef enum { #define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW) #define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX | LCK_GROUP) #define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL) +#define LCK_COMPAT_COS (LCK_COS) extern ldlm_mode_t lck_compat_array[]; @@ -669,6 +670,9 @@ struct ldlm_lock { * Server-side-only members. */ + /* connection cookie for the client originated the opeation */ + __u64 l_client_cookie; + /** * Protected by elt_lock. Callbacks pending. */ @@ -963,6 +967,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct lustre_handle *); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, __u32 *flags); +void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode); void ldlm_lock_cancel(struct ldlm_lock *lock); void ldlm_cancel_locks_for_export(struct obd_export *export); void ldlm_reprocess_all(struct ldlm_resource *res); @@ -1027,6 +1032,7 @@ struct ldlm_callback_suite { /* ldlm_request.c */ int ldlm_expired_completion_wait(void *data); +int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock); int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag); int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp); @@ -1062,6 +1068,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len, void *lvb_swabber, + const __u64 *client_cookie, struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index a4377b5..ff6a122 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -260,6 +260,8 @@ struct ptlrpc_reply_state { #endif /* updates to following flag serialised by srv_request_lock */ unsigned long rs_difficult:1; /* ACK/commit stuff */ + unsigned long rs_no_ack:1; /* no ACK, even for + difficult requests */ unsigned long rs_scheduled:1; /* being handled? */ unsigned long rs_scheduled_ever:1;/* any schedule attempts? */ unsigned long rs_handled:1; /* been handled yet? */ @@ -661,6 +663,7 @@ struct ptlrpc_service { int srv_watchdog_factor; /* soft watchdog timeout mutiplier */ unsigned srv_cpu_affinity:1; /* bind threads to CPUs */ unsigned srv_at_check:1; /* check early replies */ + unsigned srv_is_stopping:1; /* under unregister_service */ cfs_time_t srv_at_checktime; /* debug */ __u32 srv_req_portal; @@ -954,7 +957,7 @@ struct ptlrpc_service_conf { /* ptlrpc/service.c */ void ptlrpc_save_lock (struct ptlrpc_request *req, - struct lustre_handle *lock, int mode); + struct lustre_handle *lock, int mode, int no_ack); void ptlrpc_commit_replies (struct obd_device *obd); void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs); struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index fbd4a96..14bd5a8 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -190,6 +190,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_MDS_LOV_SYNC_RACE 0x13e #define OBD_FAIL_MDS_CLOSE_NET_REP 0x13f #define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x140 +#define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x141 #define OBD_FAIL_OST 0x200 #define OBD_FAIL_OST_CONNECT_NET 0x201 diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 07014fb..548ee14 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -86,7 +86,22 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, tmp = mode_tail; continue; } - + + if (lock->l_req_mode == LCK_COS) { + if (lock->l_client_cookie == req->l_client_cookie) { + tmp = mode_tail; + } else { + tmp = mode_tail; + if (!work_list) + RETURN(0); + compat = 0; + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, + work_list); + } + continue; + } + for (;;) { struct list_head *head; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index e739721..5e3e9d6 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1393,6 +1393,11 @@ static int check_for_next_transno(struct obd_device *obd) next_transno, queue_len, completed, connected, req_transno); obd->obd_next_recovery_transno = req_transno; wake_up = 1; + } else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS)) { + CDEBUG(D_HA, "accepting transno gaps is explicitly allowed" + " by fail_lock, waking up ("LPD64")\n", next_transno); + obd->obd_next_recovery_transno = req_transno; + wake_up = 1; } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) { /* some clients haven't connected in time, but we can try * to replay requests that demand on already committed ones @@ -2044,15 +2049,19 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) struct obd_device *obd; struct obd_export *exp; struct ptlrpc_service *svc; + ENTRY; - if (req->rq_no_reply) + if (req->rq_no_reply) { + EXIT; return; + } svc = req->rq_rqbd->rqbd_service; rs = req->rq_reply_state; if (rs == NULL || !rs->rs_difficult) { /* no notifiers */ target_send_reply_msg (req, rc, fail_id); + EXIT; return; } @@ -2082,6 +2091,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) spin_lock(&obd->obd_uncommitted_replies_lock); + CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n", + rs->rs_transno, obd->obd_last_committed); if (rs->rs_transno > obd->obd_last_committed) { /* not committed already */ list_add_tail (&rs->rs_obd_list, @@ -2112,9 +2123,11 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) atomic_inc (&svc->srv_outstanding_replies); } - if (!rs->rs_on_net || /* some notifier */ - list_empty(&rs->rs_exp_list) || /* completed already */ - list_empty(&rs->rs_obd_list)) { + if (rs->rs_transno <= obd->obd_last_committed || + (!rs->rs_on_net && !rs->rs_no_ack) || + list_empty(&rs->rs_exp_list) || /* completed already */ + list_empty(&rs->rs_obd_list)) { + CDEBUG(D_HA, "Schedule reply immediately\n"); list_add_tail (&rs->rs_list, &svc->srv_reply_queue); cfs_waitq_signal (&svc->srv_waitq); } else { @@ -2123,6 +2136,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) } spin_unlock(&svc->srv_lock); + EXIT; } int target_handle_ping(struct ptlrpc_request *req) @@ -2250,7 +2264,8 @@ ldlm_mode_t lck_compat_array[] = { [LCK_CW] LCK_COMPAT_CW, [LCK_CR] LCK_COMPAT_CR, [LCK_NL] LCK_COMPAT_NL, - [LCK_GROUP] LCK_COMPAT_GROUP + [LCK_GROUP] LCK_COMPAT_GROUP, + [LCK_COS] LCK_COMPAT_COS, }; /** diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 2b8c9bf..1ea9a7b 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -62,7 +62,8 @@ char *ldlm_lockname[] = { [LCK_CW] "CW", [LCK_CR] "CR", [LCK_NL] "NL", - [LCK_GROUP] "GROUP" + [LCK_GROUP] "GROUP", + [LCK_COS] "COS" }; char *ldlm_typename[] = { @@ -592,7 +593,7 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) lock->l_readers++; lu_ref_add_atomic(&lock->l_reference, "reader", lock); } - if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) { + if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) { lock->l_writers++; lu_ref_add_atomic(&lock->l_reference, "writer", lock); } @@ -648,7 +649,7 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) lu_ref_del(&lock->l_reference, "reader", lock); lock->l_readers--; } - if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) { + if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) { LASSERT(lock->l_writers > 0); lu_ref_del(&lock->l_reference, "writer", lock); lock->l_writers--; @@ -1447,10 +1448,10 @@ ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) ldlm_lock2desc(lock->l_blocking_lock, &d); - LDLM_LOCK_RELEASE(lock->l_blocking_lock); - lock->l_blocking_lock = NULL; lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING); + LDLM_LOCK_RELEASE(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; LDLM_LOCK_RELEASE(lock); RETURN(1); @@ -1739,6 +1740,32 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp) ldlm_cancel_locks_for_export_cb, exp); } +/** + * Downgrade an exclusive lock. + * + * A fast variant of ldlm_lock_convert for convertion of exclusive + * locks. The convertion is always successful. + * + * \param lock A lock to convert + * \param new_mode new lock mode + */ +void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode) +{ + ENTRY; + + LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX)); + LASSERT(new_mode == LCK_COS); + + lock_res_and_lock(lock); + ldlm_resource_unlink_lock(lock); + lock->l_req_mode = new_mode; + ldlm_grant_lock(lock, NULL); + unlock_res_and_lock(lock); + ldlm_reprocess_all(lock->l_resource); + + EXIT; +} + struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, __u32 *flags) { @@ -1763,7 +1790,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */ RETURN(NULL); - LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR, + LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR), "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); lock_res_and_lock(lock); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index ee466e8..7891e7a 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -2410,9 +2410,12 @@ EXPORT_SYMBOL(ldlm_lock_dump_handle); EXPORT_SYMBOL(ldlm_cancel_locks_for_export); EXPORT_SYMBOL(ldlm_reprocess_all_ns); EXPORT_SYMBOL(ldlm_lock_allow_match); +EXPORT_SYMBOL(ldlm_lock_downgrade); +EXPORT_SYMBOL(ldlm_lock_convert); /* ldlm_request.c */ EXPORT_SYMBOL(ldlm_completion_ast_async); +EXPORT_SYMBOL(ldlm_blocking_ast_nocheck); EXPORT_SYMBOL(ldlm_completion_ast); EXPORT_SYMBOL(ldlm_blocking_ast); EXPORT_SYMBOL(ldlm_glimpse_ast); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 6bf1efe..95ee14d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -276,31 +276,22 @@ noreproc: RETURN(ldlm_completion_tail(lock)); } -/* - * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs. +/** + * A helper to build a blocking ast function + * + * Perform a common operation for blocking asts: + * defferred lock cancellation. + * + * \param lock the lock blocking or canceling ast was called on + * \retval 0 + * \see mdt_blocking_ast + * \see ldlm_blocking_ast */ -int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) +int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock) { int do_ast; ENTRY; - if (flag == LDLM_CB_CANCELING) { - /* Don't need to do anything here. */ - RETURN(0); - } - - lock_res_and_lock(lock); - /* Get this: if ldlm_blocking_ast is racing with intent_policy, such - * that ldlm_blocking_ast is called just before intent_policy method - * takes the ns_lock, then by the time we get the lock, we might not - * be the correct blocking function anymore. So check, and return - * early, if so. */ - if (lock->l_blocking_ast != ldlm_blocking_ast) { - unlock_res_and_lock(lock); - RETURN(0); - } - lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); unlock_res_and_lock(lock); @@ -321,6 +312,42 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(0); } +/** + * Server blocking AST + * + * ->l_blocking_ast() callback for LDLM locks acquired by server-side + * OBDs. + * + * \param lock the lock which blocks a request or cancelling lock + * \param desc unused + * \param data unused + * \param flag indicates whether this cancelling or blocking callback + * \retval 0 + * \see ldlm_blocking_ast_nocheck + */ +int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + ENTRY; + + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + + lock_res_and_lock(lock); + /* Get this: if ldlm_blocking_ast is racing with intent_policy, such + * that ldlm_blocking_ast is called just before intent_policy method + * takes the ns_lock, then by the time we get the lock, we might not + * be the correct blocking function anymore. So check, and return + * early, if so. */ + if (lock->l_blocking_ast != ldlm_blocking_ast) { + unlock_res_and_lock(lock); + RETURN(0); + } + RETURN(ldlm_blocking_ast_nocheck(lock)); +} + /* * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See * comment in filter_intent_policy() on why you may need this. @@ -356,6 +383,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len, void *lvb_swabber, + const __u64 *client_cookie, struct lustre_handle *lockh) { struct ldlm_lock *lock; @@ -387,6 +415,8 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, unlock_res_and_lock(lock); if (policy != NULL) lock->l_policy_data = *policy; + if (client_cookie != NULL) + lock->l_client_cookie = *client_cookie; if (type == LDLM_EXTENT) lock->l_req_extent = policy->l_extent; diff --git a/lustre/lvfs/lvfs_linux.c b/lustre/lvfs/lvfs_linux.c index 0520730..4c2b1a9 100644 --- a/lustre/lvfs/lvfs_linux.c +++ b/lustre/lvfs/lvfs_linux.c @@ -430,7 +430,6 @@ int dev_check_rdonly(lvfs_sbdev_type dev); void __lvfs_set_rdonly(lvfs_sbdev_type dev, lvfs_sbdev_type jdev) { - lvfs_sbdev_sync(dev); if (jdev && (jdev != dev)) { CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n", (long)jdev); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 42a6e78..ffdc67c 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1776,6 +1776,108 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, RETURN(m); } +/** + * Asyncronous commit for mdt device. + * + * Pass asynchonous commit call down the MDS stack. + * + * \param env environment + * \param mdt the mdt device + */ +static void mdt_device_commit_async(const struct lu_env *env, + struct mdt_device *mdt) +{ + struct dt_device *dt = mdt->mdt_bottom; + int rc; + + rc = dt->dd_ops->dt_commit_async(env, dt); + if (unlikely(rc != 0)) + CWARN("async commit start failed with rc = %d", rc); +} + +/** + * Mark the lock as "synchonous". + * + * Mark the lock to deffer transaction commit to the unlock time. + * + * \param lock the lock to mark as "synchonous" + * + * \see mdt_is_lock_sync + * \see mdt_save_lock + */ +static inline void mdt_set_lock_sync(struct ldlm_lock *lock) +{ + lock->l_ast_data = (void*)1; +} + +/** + * Check whehter the lock "synchonous" or not. + * + * \param lock the lock to check + * \retval 1 the lock is "synchonous" + * \retval 0 the lock isn't "synchronous" + * + * \see mdt_set_lock_sync + * \see mdt_save_lock + */ +static inline int mdt_is_lock_sync(struct ldlm_lock *lock) +{ + return lock->l_ast_data != NULL; +} + +/** + * Blocking AST for mdt locks. + * + * Starts transaction commit if in case of COS lock conflict or + * deffers such a commit to the mdt_save_lock. + * + * \param lock the lock which blocks a request or cancelling lock + * \param desc unused + * \param data unused + * \param flag indicates whether this cancelling or blocking callback + * \retval 0 + * \see ldlm_blocking_ast_nocheck + */ +int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + int rc; + ENTRY; + + if (flag == LDLM_CB_CANCELING) + RETURN(0); + lock_res_and_lock(lock); + if (lock->l_blocking_ast != mdt_blocking_ast) { + unlock_res_and_lock(lock); + RETURN(0); + } + if (mdt_cos_is_enabled(mdt) && + lock->l_req_mode & (LCK_PW | LCK_EX) && + lock->l_blocking_lock != NULL && + lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) { + mdt_set_lock_sync(lock); + } + rc = ldlm_blocking_ast_nocheck(lock); + + /* There is no lock conflict if l_blocking_lock == NULL, + * it indicates a blocking ast sent from ldlm_lock_decref_internal + * when the last reference to a local lock was released */ + if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) { + struct lu_env env; + + rc = lu_env_init(&env, NULL, LCT_MD_THREAD); + if (unlikely(rc != 0)) + CWARN("lu_env initialization failed with rc = %d," + "cannot start asynchronous commit\n", rc); + else + mdt_device_commit_async(&env, mdt); + lu_env_fini(&env); + } + RETURN(rc); +} + int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits, int locality) { @@ -1832,7 +1934,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, */ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, LDLM_FL_ATOMIC_CB); + policy, res_id, LDLM_FL_ATOMIC_CB, + &info->mti_exp->exp_handle.h_cookie); if (unlikely(rc)) RETURN(rc); } @@ -1852,8 +1955,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, * fix it up and turn FL_LOCAL flag off. */ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB); - + res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, + &info->mti_exp->exp_handle.h_cookie); if (rc) GOTO(out, rc); @@ -1865,36 +1968,79 @@ out: RETURN(rc); } -static inline -void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h, +/** + * Save a lock within request object. + * + * Keep the lock referenced until whether client ACK or transaction + * commit happens or release the lock immediately depending on input + * parameters. If COS is ON, a write lock is converted to COS lock + * before saving. + * + * \param info thead info object + * \param h lock handle + * \param mode lock mode + * \param decref force immediate lock releasing + */ +static +void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, ldlm_mode_t mode, int decref) { ENTRY; if (lustre_handle_is_used(h)) { - if (decref) + if (decref || !info->mti_has_trans || + !(mode & (LCK_PW | LCK_EX))){ mdt_fid_unlock(h, mode); - else - ptlrpc_save_lock(req, h, mode); + } else { + struct mdt_device *mdt = info->mti_mdt; + struct ldlm_lock *lock = ldlm_handle2lock(h); + struct ptlrpc_request *req = mdt_info_req(info); + int no_ack = 0; + + LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n", + h->cookie); + CDEBUG(D_HA, "request = %p reply state = %p" + " transno = "LPD64"\n", + req, req->rq_reply_state, req->rq_transno); + if (mdt_cos_is_enabled(mdt)) { + no_ack = 1; + ldlm_lock_downgrade(lock, LCK_COS); + mode = LCK_COS; + } + ptlrpc_save_lock(req, h, mode, no_ack); + if (mdt_is_lock_sync(lock)) { + CDEBUG(D_HA, "found sync-lock," + " async commit started\n"); + mdt_device_commit_async(info->mti_env, + mdt); + } + LDLM_LOCK_PUT(lock); + } h->cookie = 0ull; } EXIT; } -/* - * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock() - * to save this lock in req. when transaction committed, req will be released, - * and lock will, too. +/** + * Unlock mdt object. + * + * Immeditely release the regular lock and the PDO lock or save the + * lock in reqeuest and keep them referenced until client ACK or + * transaction commit. + * + * \param info thread info object + * \param o mdt object + * \param h mdt lock handle referencing regular and PDO locks + * \param decref force immediate lock releasing */ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, int decref) { - struct ptlrpc_request *req = mdt_info_req(info); ENTRY; - mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); - mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); + mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); + mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); EXIT; } @@ -3388,7 +3534,7 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m) ptlrpc_unregister_service(m->mdt_fld_service); m->mdt_fld_service = NULL; } - ENTRY; + EXIT; } static int mdt_start_ptlrpc_service(struct mdt_device *m) @@ -3944,6 +4090,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_opts.mo_user_xattr = 0; m->mdt_opts.mo_acl = 0; + m->mdt_opts.mo_cos = MDT_COS_DEFAULT; lmi = server_get_mount_2(dev); if (lmi == NULL) { CERROR("Cannot get mount info for %s!\n", dev); @@ -4715,7 +4862,6 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = mdt_device_sync(&env, mdt); break; case OBD_IOC_SET_READONLY: - rc = dt->dd_ops->dt_sync(&env, dt); dt->dd_ops->dt_ro(&env, dt); break; case OBD_IOC_ABORT_RECOVERY: @@ -4837,6 +4983,42 @@ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info) return md_ucred(info->mti_env); } +/** + * Enable/disable COS. + * + * Set/Clear the COS flag in mdt options. + * + * \param mdt mdt device + * \param val 0 disables COS, other values enable COS + */ +void mdt_enable_cos(struct mdt_device *mdt, int val) +{ + struct lu_env env; + int rc; + + mdt->mdt_opts.mo_cos = !!val; + rc = lu_env_init(&env, NULL, LCT_MD_THREAD); + if (unlikely(rc != 0)) { + CWARN("lu_env initialization failed with rc = %d," + "cannot sync\n", rc); + return; + } + mdt_device_sync(&env, mdt); + lu_env_fini(&env); +} + +/** + * Check COS status. + * + * Return COS flag status/ + * + * \param mdt mdt device + */ +int mdt_cos_is_enabled(struct mdt_device *mdt) +{ + return mdt->mdt_opts.mo_cos != 0; +} + /* type constructor/destructor: mdt_type_init, mdt_type_fini */ LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 5457a05..4251858 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -127,7 +127,8 @@ struct mdt_device { mo_acl :1, mo_compat_resname:1, mo_mds_capa :1, - mo_oss_capa :1; + mo_oss_capa :1, + mo_cos :1; } mdt_opts; /* mdt state flags */ __u32 mdt_fl_cfglog:1, @@ -180,6 +181,7 @@ struct mdt_device { #define MDT_SERVICE_WATCHDOG_FACTOR (2000) #define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID) #define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR) +#define MDT_COS_DEFAULT (1) struct mdt_object { struct lu_object_header mot_header; @@ -680,13 +682,15 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache) return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1; } +int mdt_blocking_ast(struct ldlm_lock*, struct ldlm_lock_desc*, void*, int); + /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ static inline int mdt_fid_lock(struct ldlm_namespace *ns, struct lustre_handle *lh, ldlm_mode_t mode, ldlm_policy_data_t *policy, const struct ldlm_res_id *res_id, - int flags) + int flags, const __u64 *client_cookie) { int rc; @@ -694,9 +698,9 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns, LASSERT(lh != NULL); rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, - mode, &flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, - 0, NULL, lh); + mode, &flags, mdt_blocking_ast, + ldlm_completion_ast, + NULL, NULL, 0, NULL, client_cookie, lh); return rc == ELDLM_OK ? 0 : -EIO; } @@ -749,6 +753,9 @@ static inline struct lu_name *mdt_name_copy(struct lu_name *tlname, return tlname; } +void mdt_enable_cos(struct mdt_device *, int); +int mdt_cos_is_enabled(struct mdt_device *); + /* lprocfs stuff */ void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars); int mdt_procfs_init(struct mdt_device *mdt, const char *name); diff --git a/lustre/mdt/mdt_lproc.c b/lustre/mdt/mdt_lproc.c index 171c77a..0e95718 100644 --- a/lustre/mdt/mdt_lproc.c +++ b/lustre/mdt/mdt_lproc.c @@ -425,6 +425,29 @@ static int lprocfs_mdt_wr_evict_client(struct file *file, const char *buffer, return count; } +static int lprocfs_rd_cos(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + + return snprintf(page, count, "%u\n", mdt_cos_is_enabled(mdt)); +} + +static int lprocfs_wr_cos(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + mdt_enable_cos(mdt, val); + return count; +} + static struct lprocfs_vars lprocfs_mdt_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 }, @@ -447,6 +470,7 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = { { "site_stats", lprocfs_rd_site_stats, 0, 0 }, { "evict_client", 0, lprocfs_mdt_wr_evict_client, 0 }, { "hash_stats", lprocfs_obd_rd_hash, 0, 0 }, + { "commit_on_sharing", lprocfs_rd_cos, lprocfs_wr_cos, 0 }, { 0 } }; diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 32bdeb5..74e2402 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -1051,9 +1051,8 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) libcfs_nid2str(exp->exp_connection->c_peer.nid)); for (i = 0; i < oldrep->rs_nlocks; i++) - ptlrpc_save_lock(req, - &oldrep->rs_locks[i], - oldrep->rs_modes[i]); + ptlrpc_save_lock(req, &oldrep->rs_locks[i], + oldrep->rs_modes[i], 0); oldrep->rs_nlocks = 0; DEBUG_REQ(D_HA, req, "stole locks for"); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index bfdff5f..4c565ab 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -751,7 +751,9 @@ static int mdt_rename_lock(struct mdt_thread_info *info, rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, LCK_EX, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, - NULL, lh); + NULL, + &info->mti_exp->exp_handle.h_cookie, + lh); } else { struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL }; diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index ca41d1a..e724962 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -348,7 +348,7 @@ static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname, LDLM_PLAIN, NULL, LCK_EX, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, - fsname, 0, NULL, lockh); + fsname, 0, NULL, NULL, lockh); if (rc) CERROR("can't take cfg lock for %s (%d)\n", fsname, rc); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 8e3a854..82ce630 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -439,11 +439,15 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_fail = 1; obd->obd_no_transno = 1; obd->obd_no_recov = 1; - /* Set the obd readonly if we can */ - if (OBP(obd, iocontrol)) + if (OBP(obd, iocontrol)) { + obd_iocontrol(OBD_IOC_SYNC, + obd->obd_self_export, + 0, NULL, NULL); + /* Set the obd readonly if we can */ obd_iocontrol(OBD_IOC_SET_READONLY, obd->obd_self_export, 0, NULL, NULL); + } break; default: CERROR("unrecognised flag '%c'\n", diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 5c77003..663ac58 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -536,7 +536,7 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN, NULL, LCK_NL, &lock_flags, NULL, ldlm_completion_ast, NULL, NULL, - 0, NULL, &obd->u.echo.eo_nl_lock); + 0, NULL, NULL, &obd->u.echo.eo_nl_lock); LASSERT (rc == ELDLM_OK); lprocfs_echo_init_vars(&lvars); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 2809116..c7b65b3 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1545,7 +1545,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, - &lockh); + NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 5d9a155..98a872f 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -703,6 +703,28 @@ static int osd_sync(const struct lu_env *env, struct dt_device *d) return ldiskfs_force_commit(osd_sb(osd_dt_dev(d))); } +/** + * Start commit for OSD device. + * + * An implementation of dt_commit_async method for OSD device. + * Asychronously starts underlayng fs sync and thereby a transaction + * commit. + * + * \param env environment + * \param d dt device + * + * \see dt_device_operations + */ +static int osd_commit_async(const struct lu_env *env, + struct dt_device *d) +{ + struct super_block *s = osd_sb(osd_dt_dev(d)); + ENTRY; + + CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME); + RETURN(s->s_op->sync_fs(s, 0)); +} + /* * Concurrency: shouldn't matter. */ @@ -786,6 +808,7 @@ static const struct dt_device_operations osd_dt_ops = { .dt_conf_get = osd_conf_get, .dt_sync = osd_sync, .dt_ro = osd_ro, + .dt_commit_async = osd_commit_async, .dt_credit_get = osd_credit_get, .dt_init_capa_ctxt = osd_init_capa_ctxt, }; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 1f613e1..2618d84 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -86,7 +86,7 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req) if (!ack_lock->mode) break; /* XXX not even calling target_send_reply in some cases... */ - ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode); + ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0); } } @@ -253,7 +253,8 @@ static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, lh)); + ldlm_glimpse_ast, NULL, 0, NULL, + NULL, lh)); } /* @@ -452,7 +453,8 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, lh)); + ldlm_glimpse_ast, NULL, 0, NULL, + NULL, lh)); } static void ost_brw_lock_put(int mode, diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 7cca13f..5133fb6 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -341,7 +341,9 @@ void reply_out_callback(lnet_event_t *ev) * until ptlrpc_server_handle_reply() is done with it */ spin_lock(&svc->srv_lock); rs->rs_on_net = 0; - ptlrpc_schedule_difficult_reply (rs); + if (!rs->rs_no_ack || + rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed) + ptlrpc_schedule_difficult_reply (rs); spin_unlock(&svc->srv_lock); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index d79ad5b..476f666 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -421,7 +421,8 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int flags) req->rq_sent = cfs_time_current_sec(); rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len, - rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ, + (rs->rs_difficult && !rs->rs_no_ack) ? + LNET_ACK_REQ : LNET_NOACK_REQ, &rs->rs_cb_id, conn, svc->srv_rep_portal, req->rq_xid, req->rq_reply_off); out: diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 6507f1a..ee8913c 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -169,7 +169,7 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc) void ptlrpc_save_lock (struct ptlrpc_request *req, - struct lustre_handle *lock, int mode) + struct lustre_handle *lock, int mode, int no_ack) { struct ptlrpc_reply_state *rs = req->rq_reply_state; int idx; @@ -181,12 +181,14 @@ ptlrpc_save_lock (struct ptlrpc_request *req, rs->rs_locks[idx] = *lock; rs->rs_modes[idx] = mode; rs->rs_difficult = 1; + rs->rs_no_ack = !!no_ack; } void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) { struct ptlrpc_service *svc = rs->rs_service; + ENTRY; #ifdef CONFIG_SMP LASSERT (spin_is_locked (&svc->srv_lock)); @@ -194,13 +196,16 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) LASSERT (rs->rs_difficult); rs->rs_scheduled_ever = 1; /* flag any notification attempt */ - if (rs->rs_scheduled) /* being set up or already notified */ + if (rs->rs_scheduled) { /* being set up or already notified */ + EXIT; return; + } rs->rs_scheduled = 1; list_del (&rs->rs_list); list_add (&rs->rs_list, &svc->srv_reply_queue); cfs_waitq_signal (&svc->srv_waitq); + EXIT; } void @@ -208,6 +213,7 @@ ptlrpc_commit_replies (struct obd_device *obd) { struct list_head *tmp; struct list_head *nxt; + ENTRY; /* Find any replies that have been committed and get their service * to attend to complete them. */ @@ -232,6 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd) } spin_unlock(&obd->obd_uncommitted_replies_lock); + EXIT; } static int @@ -1296,6 +1303,11 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) if (!rs->rs_on_net) { /* Off the net */ svc->srv_n_difficult_replies--; + if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping) + /* wake up threads that are being stopped by + ptlrpc_unregister_service/ptlrpc_stop_threads + and sleep waiting svr_n_difficult_replies == 0 */ + cfs_waitq_broadcast(&svc->srv_waitq); spin_unlock(&svc->srv_lock); class_export_put (exp); @@ -1583,7 +1595,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc, struct ptlrpc_thread *thread) { struct l_wait_info lwi = { 0 }; + ENTRY; + CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread); spin_lock(&svc->srv_lock); thread->t_flags = SVC_STOPPING; spin_unlock(&svc->srv_lock); @@ -1597,11 +1611,13 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc, spin_unlock(&svc->srv_lock); OBD_FREE_PTR(thread); + EXIT; } void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { struct ptlrpc_thread *thread; + ENTRY; spin_lock(&svc->srv_lock); while (!list_empty(&svc->srv_threads)) { @@ -1614,6 +1630,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) } spin_unlock(&svc->srv_lock); + EXIT; } int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) @@ -1708,7 +1725,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) struct l_wait_info lwi; struct list_head *tmp; struct ptlrpc_reply_state *rs, *t; + ENTRY; + service->srv_is_stopping = 1; cfs_timer_disarm(&service->srv_at_timer); ptlrpc_stop_all_threads(service); @@ -1838,7 +1857,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) cfs_timer_disarm(&service->srv_at_timer); OBD_FREE_PTR(service); - return 0; + RETURN(0); } /* Returns 0 if the service is healthy. diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 0df9d23..26c5fb3 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -237,9 +237,9 @@ void lustre_assert_wire_constants(void) (long long)LCK_NL); LASSERTF(LCK_GROUP == 64, " found %lld\n", (long long)LCK_GROUP); - LASSERTF(LCK_MAXMODE == 65, " found %lld\n", + LASSERTF(LCK_MAXMODE == 129, " found %lld\n", (long long)LCK_MAXMODE); - LASSERTF(LCK_MODE_NUM == 7, " found %lld\n", + LASSERTF(LCK_MODE_NUM == 8, " found %lld\n", (long long)LCK_MODE_NUM); CLASSERT(LDLM_PLAIN == 10); CLASSERT(LDLM_EXTENT == 11); diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh index dc31f90..80991d5 100755 --- a/lustre/tests/replay-dual.sh +++ b/lustre/tests/replay-dual.sh @@ -403,6 +403,32 @@ test_20() { #16389 } run_test 20 "recovery time is not increasing" +test_21() { + local param_file=$TMP/$tfile-params + + save_lustre_params $(facet_active_host $SINGLEMDS) "mdt.*.commit_on_sharing" > $param_file + do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=1 + touch $MOUNT1/$tfile-1 + mv $MOUNT2/$tfile-1 $MOUNT2/$tfile-2 + mv $MOUNT1/$tfile-2 $MOUNT1/$tfile-3 + replay_barrier_nosync $SINGLEMDS + umount $MOUNT2 + + facet_failover $SINGLEMDS + + # all renames are replayed + unlink $MOUNT1/$tfile-3 || return 2 + + zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail" + + do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=0 + rm -rf $MOUNT1/$tfile-* + restore_lustre_params < $param_file + rm -f $param_file + return 0 +} +run_test 21 "commit on sharing" + equals_msg `basename $0`: test complete, cleaning up SLEEP=$((`date +%s` - $NOW)) [ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index f44a51e..422917b 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -776,6 +776,16 @@ replay_barrier_nodf() { $LCTL mark "local REPLAY BARRIER on ${!svc}" } +replay_barrier_nosync() { + local facet=$1 echo running=${running} + local svc=${facet}_svc + echo Replay barrier on ${!svc} + do_facet $facet $LCTL --device %${!svc} readonly + do_facet $facet $LCTL --device %${!svc} notransno + do_facet $facet $LCTL mark "$facet REPLAY BARRIER on ${!svc}" + $LCTL mark "local REPLAY BARRIER on ${!svc}" +} + mds_evict_client() { UUID=`lctl get_param -n mdc.${mds1_svc}-mdc-*.uuid` do_facet mds1 "lctl set_param -n mdt.${mds1_svc}.evict_client $UUID" diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index f5a777b..a48ed1e 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -234,9 +234,9 @@ void lustre_assert_wire_constants(void) (long long)LCK_NL); LASSERTF(LCK_GROUP == 64, " found %lld\n", (long long)LCK_GROUP); - LASSERTF(LCK_MAXMODE == 65, " found %lld\n", + LASSERTF(LCK_MAXMODE == 129, " found %lld\n", (long long)LCK_MAXMODE); - LASSERTF(LCK_MODE_NUM == 7, " found %lld\n", + LASSERTF(LCK_MODE_NUM == 8, " found %lld\n", (long long)LCK_MODE_NUM); CLASSERT(LDLM_PLAIN == 10); CLASSERT(LDLM_EXTENT == 11); -- 1.8.3.1