From 8c82ab5cd92ee70a4cda2fe6437861e56f3fe3d5 Mon Sep 17 00:00:00 2001 From: vitaly Date: Tue, 31 Jul 2007 11:17:54 +0000 Subject: [PATCH] Branch HEAD b=11829 i=huanghua i=tappro early lock cancel for cmd --- lustre/ChangeLog | 7 + lustre/include/lustre/lustre_idl.h | 43 ++- lustre/include/lustre_dlm.h | 51 ++-- lustre/include/obd.h | 6 +- lustre/include/obd_class.h | 6 +- lustre/ldlm/ldlm_extent.c | 2 - lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_inodebits.c | 2 - lustre/ldlm/ldlm_internal.h | 11 +- lustre/ldlm/ldlm_lock.c | 47 +--- lustre/ldlm/ldlm_lockd.c | 160 +++++++---- lustre/ldlm/ldlm_plain.c | 2 - lustre/ldlm/ldlm_request.c | 539 ++++++++++++++++++++++++++++--------- lustre/ldlm/ldlm_resource.c | 47 +++- lustre/liblustre/file.c | 1 + lustre/llite/dcache.c | 9 +- lustre/llite/dir.c | 4 +- lustre/llite/file.c | 12 +- lustre/llite/llite_internal.h | 4 +- lustre/llite/llite_lib.c | 13 +- lustre/llite/namei.c | 44 ++- lustre/lmv/lmv_obd.c | 175 +++++++++++- lustre/mdc/mdc_internal.h | 5 +- lustre/mdc/mdc_lib.c | 4 +- lustre/mdc/mdc_locks.c | 61 +++-- lustre/mdc/mdc_reint.c | 171 ++++++++++-- lustre/mdc/mdc_request.c | 1 + lustre/mds/handler.c | 2 +- lustre/mdt/mdt_handler.c | 2 +- lustre/mdt/mdt_lib.c | 64 +++-- lustre/mdt/mdt_reint.c | 15 ++ lustre/osc/osc_request.c | 48 +++- lustre/ost/ost_handler.c | 9 + lustre/ptlrpc/layout.c | 19 +- lustre/ptlrpc/pack_generic.c | 5 +- lustre/ptlrpc/wiretest.c | 37 ++- lustre/tests/sanity-lmv.sh | 46 ++++ lustre/tests/sanity.sh | 122 +++++++++ lustre/utils/wirecheck.c | 10 +- lustre/utils/wiretest.c | 37 ++- 40 files changed, 1415 insertions(+), 430 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 91cda98..351fbc5 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -318,6 +318,13 @@ Description: Short directio read returns full requested size rather than Details : Direct I/O operations should return actual amount of bytes transferred rather than requested size. +Severity : enhancement +Bugzilla : 10589 +Description: metadata RPC reduction (e.g. for rm performance) +Details : decrease the amount of synchronous RPC between clients and servers + by canceling conflicing lock before the operation on the client side + and packing thier handles into the main operation RPC to server. + -------------------------------------------------------------------------------- 2007-05-03 Cluster File Systems, Inc. diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 86c6321..56910b0 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -522,6 +522,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_CONNECT_OSS_CAPA 0x00200000ULL /* OSS capability */ #define OBD_CONNECT_MDS_MDS 0x00400000ULL /* MDS-MDS connection*/ #define OBD_CONNECT_SOM 0x00800000ULL /* SOM feature */ +#define OBD_CONNECT_CANCELSET 0x01000000ULL /* Early batched cancels. */ #define OBD_CONNECT_REAL 0x00000200ULL /* real connection */ /* also update obd_connect_names[] for lprocfs_rd_connect_flags() * and lustre/utils/wirecheck.c */ @@ -533,12 +534,12 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_LCL_CLIENT | \ OBD_CONNECT_RMT_CLIENT | \ OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \ - OBD_CONNECT_MDS_MDS) + OBD_CONNECT_MDS_MDS | OBD_CONNECT_CANCELSET) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \ - OBD_CONNECT_OSS_CAPA) + OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION) @@ -551,6 +552,9 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_OCD_VERSION_PATCH(version) ((int)((version)>>8)&255) #define OBD_OCD_VERSION_FIX(version) ((int)(version)&255) +#define exp_connect_cancelset(exp) \ + ((exp) ? (exp)->exp_connect_flags & OBD_CONNECT_CANCELSET : 0) + /* This structure is used for both request and reply. * * If we eventually have separate connect data for different types, which we @@ -955,14 +959,21 @@ struct mds_status_req { extern void lustre_swab_mds_status_req (struct mds_status_req *r); /* mdt_thread_info.mti_flags. */ -enum mdt_ioepoch_flags { +enum md_op_flags { /* The flag indicates Size-on-MDS attributes are changed. */ - MF_SOM_CHANGE = (1 << 0), + MF_SOM_CHANGE = (1 << 0), /* Flags indicates an epoch opens or closes. */ - MF_EPOCH_OPEN = (1 << 1), - MF_EPOCH_CLOSE = (1 << 2), + MF_EPOCH_OPEN = (1 << 1), + MF_EPOCH_CLOSE = (1 << 2), + MF_MDC_CANCEL_FID1 = (1 << 3), + MF_MDC_CANCEL_FID2 = (1 << 4), + MF_MDC_CANCEL_FID3 = (1 << 5), + MF_MDC_CANCEL_FID4 = (1 << 6), }; +#define MF_SOM_LOCAL_FLAGS (MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID2 | \ + MF_MDC_CANCEL_FID3 | MF_MDC_CANCEL_FID4) + #define MDS_BFLAG_UNCOMMITTED_WRITES 0x1 #define MDS_BFLAG_EXT_FLAGS 0x80000000 /* == EXT3_RESERVED_FL */ @@ -1559,16 +1570,29 @@ struct ldlm_lock_desc { extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l); +#define LDLM_LOCKREQ_HANDLES 2 +#define LDLM_ENQUEUE_CANCEL_OFF 1 + struct ldlm_request { __u32 lock_flags; - __u32 lock_padding; /* also fix lustre_swab_ldlm_request */ + __u32 lock_count; struct ldlm_lock_desc lock_desc; - struct lustre_handle lock_handle1; - struct lustre_handle lock_handle2; + struct lustre_handle lock_handle[LDLM_LOCKREQ_HANDLES]; }; extern void lustre_swab_ldlm_request (struct ldlm_request *rq); +/* If LDLM_ENQUEUE, 1 slot is already occupied, 1 is available. + * Otherwise, 2 are available. */ +#define ldlm_request_bufsize(count,type) \ +({ \ + int _avail = LDLM_LOCKREQ_HANDLES; \ + _avail -= (type == LDLM_ENQUEUE ? LDLM_ENQUEUE_CANCEL_OFF : 0); \ + sizeof(struct ldlm_request) + \ + (count > _avail ? count - _avail : 0) * \ + sizeof(struct lustre_handle); \ +}) + struct ldlm_reply { __u32 lock_flags; __u32 lock_padding; /* also fix lustre_swab_ldlm_reply */ @@ -1580,7 +1604,6 @@ struct ldlm_reply { extern void lustre_swab_ldlm_reply (struct ldlm_reply *r); - /* * Opcodes for mountconf (mgs and mgc) */ diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index b263623..c3fe0b0 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -28,6 +28,7 @@ struct obd_device; #define OBD_LDLM_DEVICENAME "ldlm" #define LDLM_DEFAULT_LRU_SIZE (100 * smp_num_cpus) +#define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000)) typedef enum { ELDLM_OK = 0, @@ -131,6 +132,8 @@ typedef enum { #define LDLM_FL_LOCK_PROTECT 0x8000000 #define LDLM_FL_LOCK_PROTECT_BIT 27 +/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */ +#define LDLM_FL_ASYNC 0x20000000 /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -149,7 +152,7 @@ typedef enum { #define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR) #define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW) #define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW) -#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX) +#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX | LCK_GROUP) #define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL) extern ldlm_mode_t lck_compat_array[]; @@ -226,6 +229,7 @@ struct ldlm_namespace { spinlock_t ns_unused_lock; unsigned int ns_max_unused; + unsigned int ns_max_age; cfs_time_t ns_next_dump; /* next debug dump, jiffies */ atomic_t ns_locks; @@ -262,13 +266,6 @@ struct ldlm_lock { /* ldlm_lock_change_resource() can change this */ struct ldlm_resource *l_resource; - /* set once, no need to protect it */ - struct ldlm_lock *l_parent; - - /* protected by ns_hash_lock */ - struct list_head l_children; - struct list_head l_childof; - /* protected by ns_hash_lock. FIXME */ struct list_head l_lru; @@ -327,8 +324,6 @@ struct ldlm_lock { __u32 l_pid; /* pid which created this lock */ __u32 l_pidb; /* who holds LOCK_PROTECT_BIT */ - struct list_head l_tmp; - /* for ldlm_add_ast_work_item() */ struct list_head l_bl_ast; struct list_head l_cp_ast; @@ -360,10 +355,6 @@ struct ldlm_resource { struct semaphore lr_lvb_sem; __u32 lr_lvb_len; void *lr_lvb_data; - - /* lr_tmp holds a list head temporarily, during the building of a work - * queue. see ldlm_add_ast_work_item and ldlm_run_ast_work */ - void *lr_tmp; }; struct ldlm_ast_work { @@ -463,6 +454,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, ldlm_blocking_callback, ldlm_glimpse_callback); int ldlm_handle_convert(struct ptlrpc_request *req); int ldlm_handle_cancel(struct ptlrpc_request *req); +int ldlm_request_cancel(struct ptlrpc_request *req, + const struct ldlm_request *dlm_req, int first); int ldlm_del_waiting_lock(struct ldlm_lock *lock); int ldlm_refresh_waiting_lock(struct ldlm_lock *lock); void ldlm_revoke_export_locks(struct obd_export *exp); @@ -499,6 +492,18 @@ do { \ lock; \ }) +#define ldlm_lock_list_put(head, member, count) \ +({ \ + struct ldlm_lock *_lock, *_next; \ + int c = count; \ + list_for_each_entry_safe(_lock, _next, head, member) { \ + list_del_init(&_lock->member); \ + LDLM_LOCK_PUT(_lock); \ + if (--c == 0) \ + break; \ + } \ +}) + struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock); void ldlm_lock_put(struct ldlm_lock *lock); void ldlm_lock_destroy(struct ldlm_lock *lock); @@ -570,6 +575,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, ldlm_glimpse_callback glimpse, void *data, void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async); +struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, + int bufcount, int *size, + struct list_head *head, int count); int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, const struct ldlm_request *dlm_req, const struct ldlm_callback_suite *cbs); @@ -595,9 +603,22 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, int ldlm_cli_cancel(struct lustre_handle *lockh); int ldlm_cli_cancel_unused(struct ldlm_namespace *, const struct ldlm_res_id *, int flags, void *opaque); +int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, + ldlm_policy_data_t *policy, + int mode, int flags, void *opaque); +int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head, + int count, int flags); int ldlm_cli_join_lru(struct ldlm_namespace *, const struct ldlm_res_id *, int join); - +int ldlm_cancel_resource_local(struct ldlm_resource *res, + struct list_head *cancels, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int lock_flags, + int flags, void *opaque); +int ldlm_cli_cancel_list(struct list_head *head, int count, + struct ptlrpc_request *req, int off, int flags); + /* mds/handler.c */ /* This has to be here because recursive inclusion sucks. */ int intent_disposition(struct ldlm_reply *rep, int flag); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 1dd41e4..56d8918 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -966,6 +966,8 @@ struct lu_context; struct md_op_data { struct lu_fid op_fid1; /* operation fid1 (usualy parent) */ struct lu_fid op_fid2; /* operation fid2 (usualy child) */ + struct lu_fid op_fid3; /* 2 extra fids to find conflicting */ + struct lu_fid op_fid4; /* to the operation locks. */ mdsno_t op_mds; /* what mds server open will go to */ struct lustre_handle op_handle; __u64 op_mod_time; @@ -978,6 +980,7 @@ struct md_op_data { __u32 op_fsuid; __u32 op_fsgid; __u32 op_cap; + void *op_data; /* iattr fields and blocks. */ struct iattr op_attr; @@ -1300,7 +1303,8 @@ struct md_ops { struct lustre_handle *); int (*m_cancel_unused)(struct obd_export *, const struct lu_fid *, - int flags, void *opaque); + ldlm_policy_data_t *, ldlm_mode_t, int flags, + void *opaque); int (*m_renew_capa)(struct obd_export *, struct obd_capa *oc, renew_capa_cb_t cb); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index e5e0ff8..fb02307 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1921,7 +1921,8 @@ static inline int md_set_lock_data(struct obd_export *exp, static inline int md_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, - int flags, void *opaque) + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int flags, void *opaque) { int rc; ENTRY; @@ -1929,7 +1930,8 @@ static inline int md_cancel_unused(struct obd_export *exp, EXP_CHECK_MD_OP(exp, cancel_unused); EXP_MD_COUNTER_INCREMENT(exp, cancel_unused); - rc = MDP(exp->exp_obd, cancel_unused)(exp, fid, flags, opaque); + rc = MDP(exp->exp_obd, cancel_unused)(exp, fid, policy, mode, + flags, opaque); RETURN(rc); } diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index d84da7f..7d9e1bf 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -347,12 +347,10 @@ destroylock: /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the ns lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, struct list_head *work_list) diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 0db3d41..3a45d01 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -334,7 +334,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ - new2 = ldlm_lock_create(ns, NULL, &res->lr_name, LDLM_FLOCK, + new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); if (!new2) { diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index ac82ab0..97f941f 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -112,12 +112,10 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the ns lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 4a900d9..9b90f45 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -8,7 +8,12 @@ typedef enum { LDLM_SYNC, } ldlm_sync_t; +/* Cancel lru flag, it indicates we cancel aged locks. */ +#define LDLM_CANCEL_AGED 0x00000001 + int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, + int count, int max, int flags); /* ldlm_resource.c */ int ldlm_resource_putref_locked(struct ldlm_resource *res); @@ -18,9 +23,7 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, /* ldlm_lock.c */ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); struct ldlm_lock * -ldlm_lock_create(struct ldlm_namespace *ns, - const struct lustre_handle *parent_lock_handle, - const struct ldlm_res_id *, +ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, ldlm_type_t type, ldlm_mode_t, ldlm_blocking_callback, ldlm_completion_callback, ldlm_glimpse_callback, void *data, __u32 lvb_len); @@ -39,7 +42,7 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock); + struct ldlm_lock *lock, int flags); void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 13f26ab..2f1450b 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -149,9 +149,6 @@ void ldlm_lock_put(struct ldlm_lock *lock) LASSERT(list_empty(&lock->l_res_link)); LASSERT(list_empty(&lock->l_pending_chain)); - if (lock->l_parent) - LDLM_LOCK_PUT(lock->l_parent); - atomic_dec(&res->lr_namespace->ns_locks); ldlm_resource_putref(res); lock->l_resource = NULL; @@ -203,12 +200,6 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock) { ENTRY; - if (!list_empty(&lock->l_children)) { - LDLM_ERROR(lock, "still has children (%p)!", - lock->l_children.next); - ldlm_lock_dump(D_ERROR, lock, 0); - LBUG(); - } if (lock->l_readers || lock->l_writers) { LDLM_ERROR(lock, "lock still has references"); ldlm_lock_dump(D_ERROR, lock, 0); @@ -288,8 +279,7 @@ static void lock_handle_addref(void *lock) * after return, ldlm_*_put the resource and parent * returns: lock with refcount 2 - one for current caller and one for remote */ -static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, - struct ldlm_resource *resource) +static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) { struct ldlm_lock *lock; ENTRY; @@ -304,12 +294,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, lock->l_resource = ldlm_resource_getref(resource); atomic_set(&lock->l_refc, 2); - CFS_INIT_LIST_HEAD(&lock->l_children); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); CFS_INIT_LIST_HEAD(&lock->l_export_chain); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); - CFS_INIT_LIST_HEAD(&lock->l_tmp); CFS_INIT_LIST_HEAD(&lock->l_bl_ast); CFS_INIT_LIST_HEAD(&lock->l_cp_ast); cfs_waitq_init(&lock->l_waitq); @@ -321,14 +309,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, lock->l_sl_policy.next = NULL; atomic_inc(&resource->lr_namespace->ns_locks); - - if (parent != NULL) { - spin_lock(&resource->lr_namespace->ns_hash_lock); - lock->l_parent = LDLM_LOCK_GET(parent); - list_add(&lock->l_childof, &parent->l_children); - spin_unlock(&resource->lr_namespace->ns_hash_lock); - } - CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, lock_handle_addref); @@ -606,7 +586,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread(ns, NULL, lock) != 0) + ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && @@ -615,12 +595,16 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) * reference, put it on the LRU. */ LASSERT(list_empty(&lock->l_lru)); LASSERT(ns->ns_nr_unused >= 0); + lock->l_last_used = cfs_time_current(); spin_lock(&ns->ns_unused_lock); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; spin_unlock(&ns->ns_unused_lock); unlock_res_and_lock(lock); - ldlm_cancel_lru(ns, LDLM_ASYNC); + /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported + * by the server, otherwise, it is done on enqueue. */ + if (!exp_connect_cancelset(lock->l_conn_export)) + ldlm_cancel_lru(ns, LDLM_ASYNC); } else { unlock_res_and_lock(lock); } @@ -1084,7 +1068,6 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, - const struct lustre_handle *parent_lock_handle, const struct ldlm_res_id *res_id, ldlm_type_t type, ldlm_mode_t mode, @@ -1093,24 +1076,16 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len) { - struct ldlm_resource *res, *parent_res = NULL; - struct ldlm_lock *lock, *parent_lock = NULL; + struct ldlm_lock *lock; + struct ldlm_resource *res; ENTRY; - if (parent_lock_handle) { - parent_lock = ldlm_handle2lock(parent_lock_handle); - if (parent_lock) - parent_res = parent_lock->l_resource; - } - - res = ldlm_resource_get(ns, parent_res, res_id, type, 1); + res = ldlm_resource_get(ns, NULL, res_id, type, 1); if (res == NULL) RETURN(NULL); - lock = ldlm_lock_new(parent_lock, res); + lock = ldlm_lock_new(res); ldlm_resource_putref(res); - if (parent_lock != NULL) - LDLM_LOCK_PUT(parent_lock); if (lock == NULL) RETURN(NULL); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 59c7cfa..af90be5 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -97,6 +97,7 @@ struct ldlm_bl_work_item { struct ldlm_namespace *blwi_ns; struct ldlm_lock_desc blwi_ld; struct ldlm_lock *blwi_lock; + int blwi_flags; }; #ifdef __KERNEL__ @@ -545,7 +546,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, instant_cancel = 1; body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_desc = *desc; body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS); @@ -621,7 +622,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_flags = flags; ldlm_lock2desc(lock, &body->lock_desc); @@ -703,7 +704,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); lock_res_and_lock(lock); @@ -778,6 +779,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); + ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF); flags = dlm_req->lock_flags; LASSERT(req->rq_export); @@ -835,7 +837,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, if (unlikely(flags & LDLM_FL_REPLAY)) { lock = find_existing_lock(req->rq_export, - &dlm_req->lock_handle1); + &dlm_req->lock_handle[0]); if (lock != NULL) { DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64, lock->l_handle.h_cookie); @@ -844,8 +846,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, } /* The lock's callback data might be set in the policy function */ - lock = ldlm_lock_create(ns, &dlm_req->lock_handle2, - &dlm_req->lock_desc.l_resource.lr_name, + lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, cbs->lcs_blocking, cbs->lcs_completion, @@ -855,7 +856,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc = -ENOMEM); do_gettimeofday(&lock->l_enqueued_time); - lock->l_remote_handle = dlm_req->lock_handle1; + lock->l_remote_handle = dlm_req->lock_handle[0]; LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); @@ -1069,7 +1070,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, sizeof(*dlm_rep)); dlm_rep->lock_flags = dlm_req->lock_flags; - lock = ldlm_handle2lock(&dlm_req->lock_handle1); + lock = ldlm_handle2lock(&dlm_req->lock_handle[0]); if (!lock) { req->rq_status = EINVAL; } else { @@ -1116,11 +1117,73 @@ int ldlm_handle_convert(struct ptlrpc_request *req) return rc; } +/* Cancel all the locks, which handles are packed into ldlm_request */ +int ldlm_request_cancel(struct ptlrpc_request *req, + const struct ldlm_request *dlm_req, int first) +{ + struct ldlm_resource *res, *pres = NULL; + struct ldlm_lock *lock; + int i, count, done = 0; + ENTRY; + + count = dlm_req->lock_count ? dlm_req->lock_count : 1; + if (first >= count) + RETURN(0); + + /* There is no lock on the server at the replay time, + * skip lock cancelling to make replay tests to pass. */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); + + LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks", + count - first); + for (i = first; i < count; i++) { + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (!lock) { + LDLM_DEBUG_NOLOCK("server-side cancel handler stale " + "lock (cookie "LPU64")", + dlm_req->lock_handle[i].cookie); + continue; + } + + res = lock->l_resource; + done++; + ldlm_lock_cancel(lock); + if (ldlm_del_waiting_lock(lock)) + CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); + + if (res != pres) { + if (pres != NULL) { + if (pres->lr_namespace->ns_lvbo && + pres->lr_namespace->ns_lvbo->lvbo_update) { + (void)pres->lr_namespace->ns_lvbo-> + lvbo_update(pres, NULL, 0, 1); + } + ldlm_reprocess_all(pres); + ldlm_resource_putref(pres); + } + if (res != NULL) + ldlm_resource_getref(res); + pres = res; + } + LDLM_LOCK_PUT(lock); + } + if (pres != NULL) { + if (pres->lr_namespace->ns_lvbo && + pres->lr_namespace->ns_lvbo->lvbo_update) { + (void)pres->lr_namespace->ns_lvbo-> + lvbo_update(pres, NULL, 0, 1); + } + ldlm_reprocess_all(pres); + ldlm_resource_putref(pres); + } + LDLM_DEBUG_NOLOCK("server-side cancel handler END"); + RETURN(done); +} + int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock *lock; - struct ldlm_resource *res; int rc; ENTRY; @@ -1140,42 +1203,13 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) CERROR("out of memory\n"); RETURN(-ENOMEM); } - - lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) { - CERROR("received cancel for unknown lock cookie "LPX64 - " from client %s id %s\n", - dlm_req->lock_handle1.cookie, - req->rq_export->exp_client_uuid.uuid, - libcfs_id2str(req->rq_peer)); - LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock " - "(cookie "LPU64")", - dlm_req->lock_handle1.cookie); + + if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; - } else { - LDLM_DEBUG(lock, "server-side cancel handler START"); - res = lock->l_resource; - if (res && res->lr_namespace->ns_lvbo && - res->lr_namespace->ns_lvbo->lvbo_update) { - (void)res->lr_namespace->ns_lvbo->lvbo_update - (res, NULL, 0, 0); - } - - ldlm_lock_cancel(lock); - if (ldlm_del_waiting_lock(lock)) - CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); - req->rq_status = rc; - } if (ptlrpc_reply(req) != 0) LBUG(); - if (lock) { - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "server-side cancel handler END"); - LDLM_LOCK_PUT(lock); - } - RETURN(0); } @@ -1302,9 +1336,10 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, if (lock->l_granted_mode == LCK_PW && !lock->l_readers && !lock->l_writers && cfs_time_after(cfs_time_current(), - cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { + cfs_time_add(lock->l_last_used, + cfs_time_seconds(10)))) { unlock_res_and_lock(lock); - if (ldlm_bl_to_thread(ns, NULL, lock)) + if (ldlm_bl_to_thread(ns, NULL, lock, 0)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; @@ -1327,7 +1362,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock) + struct ldlm_lock *lock, int flags) { #ifdef __KERNEL__ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; @@ -1342,6 +1377,7 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, if (ld != NULL) blwi->blwi_ld = *ld; blwi->blwi_lock = lock; + blwi->blwi_flags = flags; spin_lock(&blp->blp_lock); list_add_tail(&blwi->blwi_entry, &blp->blp_list); @@ -1382,7 +1418,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lustre_swab_ldlm_request); if (dlm_req != NULL) CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n", - dlm_req->lock_handle1.cookie); + dlm_req->lock_handle[0].cookie); ldlm_callback_reply(req, -ENOTCONN); RETURN(0); @@ -1454,10 +1490,10 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN (0); } - lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1); + lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]); if (!lock) { CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n", - dlm_req->lock_handle1.cookie); + dlm_req->lock_handle[0].cookie); ldlm_callback_reply(req, -EINVAL); RETURN(0); } @@ -1481,7 +1517,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "blocking ast\n"); if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) ldlm_callback_reply(req, 0); - if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock)) + if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: @@ -1522,8 +1558,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) sizeof(*dlm_req), lustre_swab_ldlm_request); if (dlm_req != NULL) - ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1); - + ldlm_lock_dump_handle(D_ERROR, + &dlm_req->lock_handle[0]); ldlm_callback_reply(req, -ENOTCONN); RETURN(0); } @@ -1659,8 +1695,22 @@ static int ldlm_bl_thread_main(void *arg) if (blwi->blwi_ns == NULL) break; - ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, - blwi->blwi_lock); + if (blwi->blwi_flags == LDLM_FL_CANCELING) { + /* The special case when we cancel locks in lru + * asynchronously, then we first remove the lock from + * l_bl_ast explicitely in ldlm_cancel_lru before + * sending it to this thread. Thus lock is marked + * LDLM_FL_CANCELING, and already cancelled locally. */ + CFS_LIST_HEAD(head); + LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast)); + list_add(&blwi->blwi_lock->l_bl_ast, &head); + ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export, + &head, 1, 0); + LDLM_LOCK_PUT(blwi->blwi_lock); + } else { + ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, + blwi->blwi_lock); + } OBD_FREE(blwi, sizeof(*blwi)); } @@ -1940,18 +1990,23 @@ EXPORT_SYMBOL(ldlm_completion_ast); EXPORT_SYMBOL(ldlm_blocking_ast); EXPORT_SYMBOL(ldlm_glimpse_ast); EXPORT_SYMBOL(ldlm_expired_completion_wait); +EXPORT_SYMBOL(ldlm_prep_enqueue_req); EXPORT_SYMBOL(ldlm_cli_convert); EXPORT_SYMBOL(ldlm_cli_enqueue); EXPORT_SYMBOL(ldlm_cli_enqueue_fini); EXPORT_SYMBOL(ldlm_cli_enqueue_local); EXPORT_SYMBOL(ldlm_cli_cancel); EXPORT_SYMBOL(ldlm_cli_cancel_unused); +EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource); +EXPORT_SYMBOL(ldlm_cli_cancel_req); EXPORT_SYMBOL(ldlm_cli_join_lru); EXPORT_SYMBOL(ldlm_replay_locks); EXPORT_SYMBOL(ldlm_resource_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach_res); EXPORT_SYMBOL(ldlm_resource_iterate); +EXPORT_SYMBOL(ldlm_cancel_resource_local); +EXPORT_SYMBOL(ldlm_cli_cancel_list); /* ldlm_lockd.c */ EXPORT_SYMBOL(ldlm_server_blocking_ast); @@ -1960,6 +2015,7 @@ EXPORT_SYMBOL(ldlm_server_glimpse_ast); EXPORT_SYMBOL(ldlm_handle_enqueue); EXPORT_SYMBOL(ldlm_handle_enqueue0); EXPORT_SYMBOL(ldlm_handle_cancel); +EXPORT_SYMBOL(ldlm_request_cancel); EXPORT_SYMBOL(ldlm_handle_convert); EXPORT_SYMBOL(ldlm_handle_convert0); EXPORT_SYMBOL(ldlm_del_waiting_lock); diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 71351d2..c6b2c89 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -88,12 +88,10 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the resource lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the resource lock held */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, struct list_head *work_list) diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a9bd553..c781b30 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -246,7 +246,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, LBUG(); } - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + lock = ldlm_lock_create(ns, res_id, type, mode, blocking, completion, glimpse, data, lvb_len); if (unlikely(!lock)) GOTO(out_nolock, err = -ENOMEM); @@ -462,6 +462,69 @@ cleanup: return rc; } +/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into + * a single page on the send/receive side. XXX: 512 should be changed + * to more adequate value. */ +#define ldlm_req_handles_avail(exp, size, bufcount, off) \ +({ \ + int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \ + int _s = size[DLM_LOCKREQ_OFF]; \ + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \ + _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \ + bufcount, size); \ + _avail /= sizeof(struct lustre_handle); \ + _avail += LDLM_LOCKREQ_HANDLES - off; \ + size[DLM_LOCKREQ_OFF] = _s; \ + _avail; \ +}) + +/* Cancel lru locks and pack them into the enqueue request. Pack there the given + * @count locks in @cancel. */ +struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, + int bufcount, int *size, + struct list_head *cancels, + int count) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_request *dlm = NULL; + struct ptlrpc_request *req; + CFS_LIST_HEAD(head); + ENTRY; + + if (cancels == NULL) + cancels = &head; + if (exp_connect_cancelset(exp)) { + /* Estimate the amount of available space in the request. */ + int avail = ldlm_req_handles_avail(exp, size, bufcount, + LDLM_ENQUEUE_CANCEL_OFF); + LASSERT(avail >= count); + + /* Cancel lru locks here _only_ if the server supports + * EARLY_CANCEL. Otherwise we have to send extra CANCEL + * rpc right on enqueue, what will make it slower, vs. + * asynchronous rpc in blocking thread. */ + count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count, + LDLM_CANCEL_AGED); + size[DLM_LOCKREQ_OFF] = + ldlm_request_bufsize(count, LDLM_ENQUEUE); + } + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, + LDLM_ENQUEUE, bufcount, size, NULL); + if (exp_connect_cancelset(exp) && req) { + dlm = lustre_msg_buf(req->rq_reqmsg, + DLM_LOCKREQ_OFF, sizeof(*dlm)); + /* Skip first lock handler in ldlm_request_pack(), this method + * will incrment @lock_count according to the lock handle amount + * actually written to the buffer. */ + dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF; + } + if (req) + ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0); + else + ldlm_lock_list_put(cancels, l_bl_ast, count); + RETURN(req); +} + /* If a request has some specific initialisation it is passed in @reqp, * otherwise it is created in ldlm_cli_enqueue. * @@ -500,7 +563,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + lock = ldlm_lock_create(ns, res_id, type, mode, blocking, completion, glimpse, data, lvb_len); if (lock == NULL) RETURN(-ENOMEM); @@ -531,8 +594,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, /* lock not sent to server yet */ if (reqp == NULL || *reqp == NULL) { - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) { failed_lock_cleanup(ns, lock, lockh, mode); LDLM_LOCK_PUT(lock); @@ -543,7 +605,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, *reqp = req; } else { req = *reqp; - LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) == + LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >= sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n", DLM_LOCKREQ_OFF, lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF), @@ -558,7 +620,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = *flags; - body->lock_handle1 = *lockh; + body->lock_handle[0] = *lockh; /* Continue as normal. */ if (!req_passed_in) { @@ -652,7 +714,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) GOTO(out, rc = -ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_desc.l_req_mode = new_mode; body->lock_flags = *flags; @@ -695,24 +757,15 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) return rc; } -int ldlm_cli_cancel(struct lustre_handle *lockh) +/* Cancel locks locally. + * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */ +static int ldlm_cli_cancel_local(struct ldlm_lock *lock) { - struct ptlrpc_request *req; - struct ldlm_lock *lock; - struct ldlm_request *body; - int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(*body) }; int rc = 0; ENTRY; - - /* concurrent cancels on the same handle can happen */ - lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); - if (lock == NULL) - RETURN(0); if (lock->l_conn_export) { int local_only; - struct obd_import *imp; LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ @@ -722,25 +775,108 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); unlock_res_and_lock(lock); - - if (local_only) { + + if (local_only) CDEBUG(D_INFO, "not sending request (at caller's " "instruction)\n"); - goto local_cancel; + else + rc = 1; + + ldlm_lock_cancel(lock); + } else { + if (lock->l_resource->lr_namespace->ns_client) { + LDLM_ERROR(lock, "Trying to cancel local lock"); + LBUG(); } + LDLM_DEBUG(lock, "server-side local cancel"); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "server-side local cancel handler END"); + } + + RETURN(rc); +} + +/* Pack @count locks in @head into ldlm_request buffer at the offset @off, + of the request @req. */ +static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, + struct list_head *head, int count) +{ + struct ldlm_request *dlm; + struct ldlm_lock *lock; + int max; + ENTRY; + + dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm)); + LASSERT(dlm != NULL); + + /* Check the room in the request buffer. */ + max = lustre_msg_buflen(req->rq_reqmsg, off) - + sizeof(struct ldlm_request); + max /= sizeof(struct lustre_handle); + max += LDLM_LOCKREQ_HANDLES; + LASSERT(max >= dlm->lock_count + count); + + /* XXX: it would be better to pack lock handles grouped by resource. + * so that the server cancel would call filter_lvbo_update() less + * frequently. */ + list_for_each_entry(lock, head, l_bl_ast) { + if (!count--) + break; + /* Pack the lock handle to the given request buffer. */ + LASSERT(lock->l_conn_export); + /* Cannot be set on a lock in a resource granted list.*/ + LASSERT(!(lock->l_flags & + (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK))); + /* If @lock is marked CANCEL_ON_BLOCK, cancel + * will not be sent in ldlm_cli_cancel(). It + * is used for liblustre clients, no cancel on + * block requests. However, even for liblustre + * clients, when the flag is set, batched cancel + * should be sent (what if no block rpc has + * come). To not send another separated rpc in + * this case, the caller pass CANCEL_ON_BLOCK + * flag to ldlm_cli_cancel_unused_resource(). */ + dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; + } + EXIT; +} + +/* Prepare and send a batched cancel rpc, it will include count lock handles + * of locks given in @head. */ +int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, + int count, int flags) +{ + struct ptlrpc_request *req = NULL; + struct ldlm_request *body; + int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREQ_OFF] = sizeof(*body) }; + struct obd_import *imp; + int free, sent = 0; + int rc = 0; + ENTRY; + + LASSERT(exp != NULL); + LASSERT(count > 0); - restart: - imp = class_exp2cliimp(lock->l_conn_export); + free = ldlm_req_handles_avail(exp, size, 2, 0); + if (count > free) + count = free; + + size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL); + while (1) { + imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) { CDEBUG(D_HA, "skipping cancel on invalid import %p\n", imp); - goto local_cancel; + break; } req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + req->rq_no_resend = 1; /* XXX FIXME bug 249 */ @@ -749,85 +885,112 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count); ptlrpc_req_set_repsize(req, 1, NULL); - rc = ptlrpc_queue_wait(req); - + if (flags & LDLM_FL_ASYNC) { + ptlrpcd_add_req(req); + sent = count; + GOTO(out, 0); + } else { + rc = ptlrpc_queue_wait(req); + } if (rc == ESTALE) { - CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync " - "-- not fatal, flags %x\n", + CDEBUG(D_DLMTRACE, "client/server (nid %s) " + "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> - imp_connection->c_peer.nid), - lock->l_flags); + imp_connection->c_peer.nid)); } else if (rc == -ETIMEDOUT) { ptlrpc_req_finished(req); - GOTO(restart, rc); + continue; } else if (rc != ELDLM_OK) { CERROR("Got rc %d from cancel RPC: canceling " "anyway\n", rc); + break; } - - ptlrpc_req_finished(req); - local_cancel: - ldlm_lock_cancel(lock); - } else { - if (lock->l_resource->lr_namespace->ns_client) { - LDLM_ERROR(lock, "Trying to cancel local lock"); - LBUG(); - } - LDLM_DEBUG(lock, "client-side local cancel"); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "client-side local cancel handler END"); + sent = count; + break; } + ptlrpc_req_finished(req); EXIT; - out: - LDLM_LOCK_PUT(lock); - return rc; +out: + return sent ? sent : rc; } -/* when called with LDLM_ASYNC the blocking callback will be handled - * in a thread and this function will return after the thread has been - * asked to call the callback. when called with LDLM_SYNC the blocking - * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) +int ldlm_cli_cancel(struct lustre_handle *lockh) { - struct ldlm_lock *lock, *next; - int count, rc = 0; - CFS_LIST_HEAD(cblist); + struct ldlm_lock *lock; + CFS_LIST_HEAD(head); + int rc = 0; ENTRY; -#ifndef __KERNEL__ - sync = LDLM_SYNC; /* force to be sync in user space */ -#endif + /* concurrent cancels on the same handle can happen */ + lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); + if (lock == NULL) + RETURN(0); - spin_lock(&ns->ns_unused_lock); - count = ns->ns_nr_unused - ns->ns_max_unused; + rc = ldlm_cli_cancel_local(lock); + if (rc <= 0) + GOTO(out, rc); - if (count <= 0) { - spin_unlock(&ns->ns_unused_lock); - RETURN(0); - } + list_add(&lock->l_bl_ast, &head); + rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); + EXIT; +out: + LDLM_LOCK_PUT(lock); + return rc < 0 ? rc : 0; +} +/* - Free space in lru for @count new locks, + * redundant unused locks are canceled locally; + * - also cancel locally unused aged locks; + * - do not cancel more than @max locks; + * - GET the found locks and add them into the @cancels list. + * + * A client lock can be added to the l_bl_ast list only when it is + * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL. + * There are the following use cases: ldlm_cancel_resource_local(), + * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this + * flag properly. As any attempt to cancel a lock rely on this flag, + * l_bl_ast list is accessed later without any special locking. */ +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, + int count, int max, int flags) +{ + cfs_time_t cur = cfs_time_current(); + struct ldlm_lock *lock, *next; + int rc, added = 0, left; + ENTRY; + + spin_lock(&ns->ns_unused_lock); + count += ns->ns_nr_unused - ns->ns_max_unused; while (!list_empty(&ns->ns_unused_list)) { struct list_head *tmp = ns->ns_unused_list.next; lock = list_entry(tmp, struct ldlm_lock, l_lru); - LASSERT(!lock->l_readers && !lock->l_writers); + + if (max && added >= max) + break; + + if ((added >= count) && + (!(flags & LDLM_CANCEL_AGED) || + cfs_time_before_64(cur, ns->ns_max_age + + lock->l_last_used))) + break; LDLM_LOCK_GET(lock); /* dropped by bl thread */ spin_unlock(&ns->ns_unused_lock); lock_res_and_lock(lock); - if (ldlm_lock_remove_from_lru(lock) == 0) { - /* other thread is removing lock from lru */ + if ((ldlm_lock_remove_from_lru(lock) == 0) || + (lock->l_flags & LDLM_FL_CANCELING)) { + /* other thread is removing lock from lru or + * somebody is already doing CANCEL. */ unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); spin_lock(&ns->ns_unused_lock); continue; - } + LASSERT(!lock->l_readers && !lock->l_writers); /* If we have chosen to canecl this lock voluntarily, we better send cancel notification to server, so that it frees @@ -841,59 +1004,85 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) * the lock can accumulate no more readers/writers. Since * readers and writers are already zero here, ldlm_lock_decref * won't see this flag and call l_blocking_ast */ - lock->l_flags |= LDLM_FL_CBPENDING; - + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; /* We can't re-add to l_lru as it confuses the refcounting in * ldlm_lock_remove_from_lru() if an AST arrives after we drop - * ns_lock below. We use l_tmp and can't use l_pending_chain as - * it is used both on server and client nevertheles bug 5666 + * ns_lock below. We use l_bl_ast and can't use l_pending_chain + * as it is used both on server and client nevertheles bug 5666 * says it is used only on server. --umka */ - list_add(&lock->l_tmp, &cblist); - unlock_res_and_lock(lock); - - LDLM_LOCK_GET(lock); /* to hold lock after bl thread */ - if (sync == LDLM_ASYNC && (ldlm_bl_to_thread(ns, NULL, lock) == 0)) { - lock_res_and_lock(lock); - list_del_init(&lock->l_tmp); - unlock_res_and_lock(lock); - } - LDLM_LOCK_PUT(lock); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, cancels); + unlock_res_and_lock(lock); spin_lock(&ns->ns_unused_lock); - - if (--count == 0) - break; + added++; } spin_unlock(&ns->ns_unused_lock); - list_for_each_entry_safe(lock, next, &cblist, l_tmp) { - list_del_init(&lock->l_tmp); - ldlm_handle_bl_callback(ns, NULL, lock); - } - - RETURN(rc); + /* Handle only @added inserted locks. */ + left = added; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + if (left-- == 0) + break; + rc = ldlm_cli_cancel_local(lock); + if (rc == 0) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + added--; + } + } + RETURN(added); } -static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, - const struct ldlm_res_id *res_id, - int flags, void *opaque) +/* when called with LDLM_ASYNC the blocking callback will be handled + * in a thread and this function will return after the thread has been + * asked to call the callback. when called with LDLM_SYNC the blocking + * callback will be performed in this function. */ +int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) { - struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list); - struct ldlm_resource *res; - struct ldlm_lock *lock; + CFS_LIST_HEAD(cancels); + int count, rc; ENTRY; - res = ldlm_resource_get(ns, NULL, res_id, 0, 0); - if (res == NULL) { - /* This is not a problem. */ - CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]); - RETURN(0); +#ifndef __KERNEL__ + sync = LDLM_SYNC; /* force to be sync in user space */ +#endif + count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0); + if (sync == LDLM_ASYNC) { + struct ldlm_lock *lock, *next; + list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { + /* Remove from the list to allow blocking thread to + * re-use l_bl_ast. */ + list_del_init(&lock->l_bl_ast); + rc = ldlm_bl_to_thread(ns, NULL, lock, + LDLM_FL_CANCELING); + if (rc) + list_add_tail(&lock->l_bl_ast, &next->l_bl_ast); + } } - lock_res(res); - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + /* If some locks are left in the list in ASYNC mode, or + * this is SYNC mode, cancel the list. */ + ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0); + RETURN(0); +} +/* Find and cancel locally unused locks found on resource, matched to the + * given policy, mode. GET the found locks and add them into the @cancels + * list. */ +int ldlm_cancel_resource_local(struct ldlm_resource *res, + struct list_head *cancels, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int lock_flags, + int flags, void *opaque) +{ + struct ldlm_lock *lock, *next; + int count = 0, left; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { if (opaque != NULL && lock->l_ast_data != opaque) { LDLM_ERROR(lock, "data %p doesn't match opaque %p", lock->l_ast_data, opaque); @@ -909,34 +1098,130 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, continue; } + if (lockmode_compat(lock->l_granted_mode, mode)) + continue; + + /* If policy is given and this is IBITS lock, add to list only + * those locks that match by policy. */ + if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && + !(lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits)) + continue; + + /* If somebody is already doing CANCEL, skip it. */ + if (lock->l_flags & LDLM_FL_CANCELING) + continue; + /* See CBPENDING comment in ldlm_cancel_lru */ - lock->l_flags |= LDLM_FL_CBPENDING; + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | + lock_flags; LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, &list); + list_add(&lock->l_bl_ast, cancels); LDLM_LOCK_GET(lock); + count++; } unlock_res(res); - list_for_each_safe(tmp, next, &list) { - struct lustre_handle lockh; - int rc; - lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); + /* Handle only @count inserted locks. */ + left = count; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + int rc = 0; - if (flags & LDLM_FL_LOCAL_ONLY) { + if (left-- == 0) + break; + if (flags & LDLM_FL_LOCAL_ONLY) ldlm_lock_cancel(lock); + else + rc = ldlm_cli_cancel_local(lock); + + if (rc == 0) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + count--; + } + } + RETURN(count); +} + +/* If @req is NULL, send CANCEL request to server with handles of locks + * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests + * separately per lock. + * If @req is not NULL, put handles of locks in @cancels into the request + * buffer at the offset @off. + * Destroy @cancels at the end. */ +int ldlm_cli_cancel_list(struct list_head *cancels, int count, + struct ptlrpc_request *req, int off, int flags) +{ + struct ldlm_lock *lock; + int res = 0; + ENTRY; + + if (list_empty(cancels) || count == 0) + RETURN(0); + + /* XXX: requests (both batched and not) could be sent in parallel. + * Usually it is enough to have just 1 RPC, but it is possible that + * there are to many locks to be cancelled in LRU or on a resource. + * It would also speed up the case when the server does not support + * the feature. */ + while (count > 0) { + LASSERT(!list_empty(cancels)); + lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast); + LASSERT(lock->l_conn_export); + + if (exp_connect_cancelset(lock->l_conn_export)) { + res = count; + if (req) + ldlm_cancel_pack(req, off, cancels, count); + else + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, count, flags); } else { - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel: %d\n", rc); + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, 1, flags); + } + + if (res < 0) { + CERROR("ldlm_cli_cancel_list: %d\n", res); + res = count; } - list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock); + + count -= res; + ldlm_lock_list_put(cancels, l_bl_ast, res); } + LASSERT(list_empty(cancels)); + LASSERT(count == 0); + RETURN(0); +} - ldlm_resource_putref(res); +int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, + ldlm_policy_data_t *policy, + int mode, int flags, void *opaque) +{ + struct ldlm_resource *res; + CFS_LIST_HEAD(cancels); + int count; + int rc; + ENTRY; + + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + if (res == NULL) { + /* This is not a problem. */ + CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]); + RETURN(0); + } + count = ldlm_cancel_resource_local(res, &cancels, policy, mode, + 0, flags, opaque); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, + DLM_LOCKREQ_OFF, flags); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); + + ldlm_resource_putref(res); RETURN(0); } @@ -956,8 +1241,7 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns) * that have 0 readers/writers. * * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying - * to notify the server. - * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */ + * to notify the server. */ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, int flags, void *opaque) @@ -969,7 +1253,8 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, RETURN(ELDLM_OK); if (res_id) - RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags, + RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL, + LCK_MINMODE, flags, opaque)); spin_lock(&ns->ns_hash_lock); @@ -985,10 +1270,11 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, spin_unlock(&ns->ns_hash_lock); rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name, + NULL, LCK_MINMODE, flags, opaque); if (rc) - CERROR("cancel_unused_res ("LPU64"): %d\n", + CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n", res->lr_name.name[0], rc); spin_lock(&ns->ns_hash_lock); @@ -1026,6 +1312,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns, !lock->l_readers && !lock->l_writers && !(lock->l_flags & LDLM_FL_LOCAL) && !(lock->l_flags & LDLM_FL_CBPENDING)) { + lock->l_last_used = cfs_time_current(); spin_lock(&ns->ns_unused_lock); LASSERT(ns->ns_nr_unused >= 0); list_add_tail(&lock->l_lru, &ns->ns_unused_list); @@ -1272,7 +1559,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = flags; - ldlm_lock2handle(lock, &body->lock_handle1); + ldlm_lock2handle(lock, &body->lock_handle[0]); size[DLM_LOCKREPLY_OFF] = sizeof(*reply); if (lock->l_lvb_len != 0) { buffers = 3; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 0bce589..30babf7 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -118,6 +118,26 @@ static int lprocfs_uint_rd(char *page, char **start, off_t off, return snprintf(page, count, "%u\n", *temp); } +#define MAX_STRING_SIZE 128 +static int lprocfs_uint_wr(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + unsigned *p = data; + char dummy[MAX_STRING_SIZE + 1], *end; + unsigned long tmp; + + dummy[MAX_STRING_SIZE] = '\0'; + if (copy_from_user(dummy, buffer, MAX_STRING_SIZE)) + return -EFAULT; + + tmp = simple_strtoul(dummy, &end, 0); + if (dummy == end) + return -EINVAL; + + *p = (unsigned int)tmp; + return count; +} + static int lprocfs_read_lru_size(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -125,7 +145,6 @@ static int lprocfs_read_lru_size(char *page, char **start, off_t off, return snprintf(page, count, "%u\n", ns->ns_max_unused); } -#define MAX_STRING_SIZE 128 static int lprocfs_write_lru_size(struct file *file, const char *buffer, unsigned long count, void *data) { @@ -199,6 +218,14 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) lock_vars[0].read_fptr = lprocfs_read_lru_size; lock_vars[0].write_fptr = lprocfs_write_lru_size; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age", + ns->ns_name); + lock_vars[0].data = &ns->ns_max_age; + lock_vars[0].read_fptr = lprocfs_uint_rd; + lock_vars[0].write_fptr = lprocfs_uint_wr; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + } } #undef MAX_STRING_SIZE @@ -248,6 +275,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) CFS_INIT_LIST_HEAD(&ns->ns_unused_list); ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; + ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; spin_lock_init(&ns->ns_unused_lock); mutex_down(&ldlm_namespace_lock); @@ -310,13 +338,15 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, lock->l_flags |= LDLM_FL_FAILED; lock->l_flags |= flags; + /* ... without sending a CANCEL message for local_only. */ + if (local_only) + lock->l_flags |= LDLM_FL_LOCAL_ONLY; + if (local_only && (lock->l_readers || lock->l_writers)) { /* This is a little bit gross, but much better than the * alternative: pretend that we got a blocking AST from * the server, so that when the lock is decref'd, it * will go away ... */ - /* ... without sending a CANCEL message. */ - lock->l_flags |= LDLM_FL_LOCAL_ONLY; unlock_res(res); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); if (lock->l_completion_ast) @@ -330,14 +360,9 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, unlock_res(res); ldlm_lock2handle(lock, &lockh); - if (!local_only) { - rc = ldlm_cli_cancel(&lockh); - if (rc) - CERROR("ldlm_cli_cancel: %d\n", rc); - } - /* Force local cleanup on errors, too. */ - if (local_only || rc != ELDLM_OK) - ldlm_lock_cancel(lock); + rc = ldlm_cli_cancel(&lockh); + if (rc) + CERROR("ldlm_cli_cancel: %d\n", rc); } else { ldlm_resource_unlink_lock(lock); unlock_res(res); diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index 1d73aeb..2ee4933 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -97,6 +97,7 @@ void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1, op_data->op_mode = mode; op_data->op_namelen = namelen; op_data->op_mod_time = CURRENT_TIME; + op_data->op_data = NULL; } void llu_finish_md_op_data(struct md_op_data *op_data) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 4cc8bf5..4c0c6a5 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -399,12 +399,13 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, parent = de->d_parent->d_inode; if (it->it_op & IT_CREAT) { - op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name, - de->d_name.len, 0, LUSTRE_OPC_CREATE); + op_data = ll_prep_md_op_data(NULL, parent, NULL, + de->d_name.name, de->d_name.len, + 0, LUSTRE_OPC_CREATE, NULL); } else { op_data = ll_prep_md_op_data(NULL, parent, de->d_inode, de->d_name.name, de->d_name.len, - 0, LUSTRE_OPC_ANY); + 0, LUSTRE_OPC_ANY, NULL); } if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -565,7 +566,7 @@ do_lookup: op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name, de->d_name.len, 0, (it->it_op & IT_CREAT ? LUSTRE_OPC_CREATE : - LUSTRE_OPC_ANY)); + LUSTRE_OPC_ANY), NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 701398e..87be848 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -302,7 +302,7 @@ static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact, struct md_op_data *op_data; op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) return (void *)op_data; @@ -547,7 +547,7 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump) lustre_swab_lov_user_md(lump); op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1a7c6f6..8db92ee 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -336,7 +336,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, op_data = ll_prep_md_op_data(NULL, parent->d_inode, file->f_dentry->d_inode, name, len, - O_RDWR, LUSTRE_OPC_ANY); + O_RDWR, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -2022,8 +2022,6 @@ static int join_file(struct inode *head_inode, struct file *head_filp, .it_flags = head_filp->f_flags|O_JOIN_FILE}; struct lustre_handle lockh; struct md_op_data *op_data; - __u32 hsize = head_inode->i_size >> 32; - __u32 tsize = head_inode->i_size; int rc; ENTRY; @@ -2034,13 +2032,13 @@ static int join_file(struct inode *head_inode, struct file *head_filp, op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent, tail_dentry->d_name.name, tail_dentry->d_name.len, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, &head_inode->i_size); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW, - op_data, &lockh, &tsize, 0, ldlm_completion_ast, - ll_md_blocking_ast, &hsize, 0); + op_data, &lockh, NULL, 0, ldlm_completion_ast, + ll_md_blocking_ast, NULL, 0); ll_finish_md_op_data(op_data); if (rc < 0) @@ -2590,7 +2588,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) /* Call getattr by fid, so do not provide name at all. */ op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode, dentry->d_inode, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 9b1f5bd..2357fed 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -475,8 +475,6 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash, struct dentry *ll_find_alias(struct inode *, struct dentry *); int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); -int ll_md_cancel_unused(struct lustre_handle *, struct inode *, int flags, - void *opaque); #ifndef LUSTRE_KERNEL_VERSION struct lookup_intent *ll_convert_intent(struct open_intent *oit, int lookup_flags); @@ -613,7 +611,7 @@ int ll_ioctl_setfacl(struct inode *inode, struct rmtacl_ioctl_data *ioc); struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1, struct inode *i2, const char *name, int namelen, - int mode, __u32 opc); + int mode, __u32 opc, void *data); void ll_finish_md_op_data(struct md_op_data *op_data); /* llite/llite_nfs.c */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index cf4b503..10687e3 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -192,7 +192,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION | - OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA; + OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | + OBD_CONNECT_CANCELSET; #ifdef CONFIG_FS_POSIX_ACL data->ocd_connect_flags |= OBD_CONNECT_ACL; #endif @@ -351,7 +352,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, } data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | - OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE; + OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_CANCELSET; if (sbi->ll_flags & LL_SBI_OSS_CAPA) data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA; @@ -1115,7 +1117,7 @@ int ll_md_setattr(struct inode *inode, struct md_op_data *op_data) ENTRY; op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -1809,7 +1811,7 @@ int ll_iocontrol(struct inode *inode, struct file *file, RETURN(-ENOMEM); op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -2129,7 +2131,7 @@ int ll_process_config(struct lustre_cfg *lcfg) struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1, struct inode *i2, const char *name, int namelen, - int mode, __u32 opc) + int mode, __u32 opc, void *data) { LASSERT(i1 != NULL); @@ -2163,6 +2165,7 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, op_data->op_bias = MDS_CHECK_SPLIT; op_data->op_opc = opc; op_data->op_mds = 0; + op_data->op_data = data; return op_data; } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 83e10cb..e211165 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -169,6 +169,17 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, if (inode == NULL) break; + LASSERT(lock->l_flags & LDLM_FL_CANCELING); + if ((bits & MDS_INODELOCK_LOOKUP) && + ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP)) + bits &= ~MDS_INODELOCK_LOOKUP; + if ((bits & MDS_INODELOCK_UPDATE) && + ll_have_md_lock(inode, MDS_INODELOCK_UPDATE)) + bits &= ~MDS_INODELOCK_UPDATE; + if ((bits & MDS_INODELOCK_OPEN) && + ll_have_md_lock(inode, MDS_INODELOCK_OPEN)) + bits &= ~MDS_INODELOCK_OPEN; + fid = ll_inode2fid(inode); if (lock->l_resource->lr_name.name[0] != fid_seq(fid) || lock->l_resource->lr_name.name[1] != fid_oid(fid) || @@ -469,7 +480,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, opc = LUSTRE_OPC_ANY; op_data = ll_prep_md_op_data(NULL, parent, NULL, dentry->d_name.name, - dentry->d_name.len, lookup_flags, opc); + dentry->d_name.len, lookup_flags, opc, + NULL); if (IS_ERR(op_data)) RETURN((void *)op_data); @@ -757,7 +769,7 @@ static int ll_new_node(struct inode *dir, struct qstr *name, tgt_len = strlen(tgt) + 1; op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, - name->len, 0, opc); + name->len, 0, opc, NULL); if (IS_ERR(op_data)) GOTO(err_exit, err = PTR_ERR(op_data)); @@ -896,7 +908,7 @@ static int ll_link_generic(struct inode *src, struct inode *dir, dir->i_generation, dir, name->len, name->name); op_data = ll_prep_md_op_data(NULL, src, dir, name->name, name->len, - 0, LUSTRE_OPC_ANY); + 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); @@ -930,6 +942,22 @@ static int ll_mkdir_generic(struct inode *dir, struct qstr *name, RETURN(err); } +/* Try to find the child dentry by its name. + If found, put the result fid into @fid. */ +static void ll_get_child_fid(struct inode * dir, struct qstr *name, + struct lu_fid *fid) +{ + struct dentry *parent, *child; + + parent = list_entry(dir->i_dentry.next, struct dentry, d_alias); + child = d_lookup(parent, name); + if (child) { + if (child->d_inode) + *fid = *ll_inode2fid(child->d_inode); + dput(child); + } +} + static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent, struct dentry *dchild, struct qstr *name) { @@ -945,10 +973,11 @@ static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent, RETURN(-EBUSY); op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len, - S_IFDIR, LUSTRE_OPC_ANY); + S_IFDIR, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); + ll_get_child_fid(dir, name, &op_data->op_fid3); rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request); ll_finish_md_op_data(op_data); if (rc == 0) @@ -1051,10 +1080,11 @@ static int ll_unlink_generic(struct inode *dir, struct dentry *dparent, RETURN(-EBUSY); op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, - name->len, 0, LUSTRE_OPC_ANY); + name->len, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); + ll_get_child_fid(dir, name, &op_data->op_fid3); rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request); ll_finish_md_op_data(op_data); @@ -1089,10 +1119,12 @@ static int ll_rename_generic(struct inode *src, struct dentry *src_dparent, RETURN(-EBUSY); op_data = ll_prep_md_op_data(NULL, src, tgt, NULL, 0, 0, - LUSTRE_OPC_ANY); + LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); + ll_get_child_fid(src, src_name, &op_data->op_fid3); + ll_get_child_fid(tgt, tgt_name, &op_data->op_fid4); err = md_rename(sbi->ll_md_exp, op_data, src_name->name, src_name->len, tgt_name->name, tgt_name->len, &request); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 333104d..b48e161 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1379,6 +1379,7 @@ repeat: CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1)); + op_data->op_flags |= MF_MDC_CANCEL_FID1; rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid, cap_effective, rdev, request); if (rc == 0) { @@ -1709,6 +1710,97 @@ repeat: RETURN(rc); } +#define md_op_data_fid(op_data, fl) \ + (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \ + fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \ + fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \ + fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ + NULL) + +/* @tgt_exp is the export the metadata request is sent. + * @fid_exp is the export the cancel should be sent for the current fid. + * if @fid_exp is NULL, the export is found for the current fid. + * @op_data keeps the current fid, which is pointed through @flag. + * @mode, @bits -- lock match parameters. */ +static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp, + struct obd_export *fid_exp, + struct md_op_data *op_data, + ldlm_mode_t mode, int bits, int flag) +{ + struct lu_fid *fid = md_op_data_fid(op_data, flag); + ldlm_policy_data_t policy = {{0}}; + int rc = 0; + ENTRY; + + if (!fid_is_sane(fid)) + RETURN(0); + + if (fid_exp == NULL) + fid_exp = lmv_find_export(lmv, fid); + + if (tgt_exp == fid_exp) { + /* The export is the same as on the target server, cancel + * will be sent along with the main metadata operation. */ + op_data->op_flags |= flag; + RETURN(0); + } + + policy.l_inodebits.bits = bits; + rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL); + RETURN(rc); +} + +#ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY +/* Check if the fid in @op_data pointed to by flag is of the same export(s) + * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call + * md_cancel_unused for child export(s). */ +static int lmv_early_cancel_stripes(struct obd_export *exp, + struct obd_export *tgt_exp, + struct md_op_data *op_data, + ldlm_mode_t mode, int bits, int flag) +{ + struct lu_fid *fid = md_op_data_fid(op_data, flag); + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *st_exp; + struct lmv_obj *obj; + int rc = 0; + ENTRY; + + if (!fid_is_sane(fid)) + RETURN(0); + + obj = lmv_obj_grab(obd, fid); + if (obj) { + ldlm_policy_data_t policy = {{0}}; + struct lu_fid *st_fid; + int i; + + policy.l_inodebits.bits = bits; + for (i = 0; i < obj->lo_objcount; i++) { + st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); + st_fid = &obj->lo_inodes[i].li_fid; + if (tgt_exp != st_exp) { + rc = md_cancel_unused(st_exp, st_fid, &policy, + mode, 0, NULL); + if (rc) + break; + } else { + /* Some export matches to @tgt_exp, do cancel + * for its fid in mdc */ + *fid = *st_fid; + op_data->op_flags |= flag; + } + } + lmv_obj_put(obj); + } else { + rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, + mode, bits, flag); + } + RETURN(rc); +} +#endif + /* * llite passes fid of an target inode in op_data->op_fid1 and id of directory in * op_data->op_fid2 @@ -1718,6 +1810,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct lmv_obj *obj; int rc, loop = 0; mdsno_t mds; @@ -1769,7 +1862,15 @@ repeat: op_data->op_fsgid = current->fsgid; op_data->op_cap = current->cap_effective; - rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request); + tgt_exp = lmv->tgts[mds].ltd_exp; + if (op_data->op_namelen) { + op_data->op_flags |= MF_MDC_CANCEL_FID2; + /* Cancel UPDATE lock on child (fid1). */ + rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX, + MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); + } + if (rc == 0) + rc = md_link(tgt_exp, op_data, request); if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -1793,11 +1894,12 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { + struct obd_export *tgt_exp = NULL, *src_exp; struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; int rc, mea_idx, loop = 0; struct lmv_obj *obj; - mdsno_t mds; + mdsno_t mds1, mds2; ENTRY; CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n", @@ -1818,7 +1920,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, "to "DFID"\n", newlen, new, oldlen, newlen, PFID(&op_data->op_fid2), PFID(&op_data->op_fid1)); - rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds); + rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1); if (rc) RETURN(rc); @@ -1851,11 +1953,11 @@ repeat: mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)old, oldlen); op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; - mds = obj->lo_inodes[mea_idx].li_mds; + mds1 = obj->lo_inodes[mea_idx].li_mds; CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1)); lmv_obj_put(obj); } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); + rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1); if (rc) RETURN(rc); } @@ -1869,9 +1971,14 @@ repeat: mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)new, newlen); + mds2 = obj->lo_inodes[mea_idx].li_mds; op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid; CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2)); lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2); + if (rc) + RETURN(rc); } request: @@ -1879,8 +1986,36 @@ request: op_data->op_fsgid = current->fsgid; op_data->op_cap = current->cap_effective; - rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen, - new, newlen, request); + src_exp = lmv_get_export(lmv, mds1); + tgt_exp = lmv_get_export(lmv, mds2); + if (oldlen) { + /* LOOKUP lock on src child (fid3) should also be cancelled for + * src_exp in mdc_rename. */ + op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; + + /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its + * own export. */ + rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX, + MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2); + + /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/ + if (rc == 0) + rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, + LCK_EX, MDS_INODELOCK_LOOKUP, + MF_MDC_CANCEL_FID4); + + /* XXX: the case when child is a striped dir is not supported. + * Only the master stripe has all locks cancelled early. */ + /* Cancel all the locks on tgt child (fid4). */ + if (rc == 0) + rc = lmv_early_cancel(lmv, src_exp, NULL, op_data, + LCK_EX, MDS_INODELOCK_FULL, + MF_MDC_CANCEL_FID4); + } + + if (rc == 0) + rc = md_rename(src_exp, op_data, old, oldlen, + new, newlen, request); if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -1921,6 +2056,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, PFID(&op_data->op_fid1), op_data->op_attr.ia_valid, obj ? ", split" : ""); + op_data->op_flags |= MF_MDC_CANCEL_FID1; if (obj) { for (i = 0; i < obj->lo_objcount; i++) { op_data->op_fid1 = obj->lo_inodes[i].li_fid; @@ -2224,6 +2360,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct obd_export *tgt_exp = NULL; + struct lmv_obj *obj; int rc, loop = 0; ENTRY; @@ -2241,7 +2378,6 @@ repeat: ++loop; LASSERT(loop <= 2); if (op_data->op_namelen != 0) { - struct lmv_obj *obj; int mea_idx; obj = lmv_obj_grab(obd, &op_data->op_fid1); @@ -2274,7 +2410,21 @@ repeat: op_data->op_fsgid = current->fsgid; op_data->op_cap = current->cap_effective; - rc = md_unlink(tgt_exp, op_data, request); + /* If child's fid is given, cancel unused locks for it if it is from + * another export than parent. */ + if (op_data->op_namelen) { + /* LOOKUP lock for child (fid3) should also be cancelled on + * parent tgt_exp in mdc_unlink(). */ + op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; + + /* XXX: the case when child is a striped dir is not supported. + * Only the master stripe has all locks cancelled early. */ + /* Cancel FULL locks on child (fid3). */ + rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX, + MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); + } + if (rc == 0) + rc = md_unlink(tgt_exp, op_data, request); if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -2539,7 +2689,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, - int flags, void *opaque) + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int flags, void *opaque) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2552,8 +2703,8 @@ static int lmv_cancel_unused(struct obd_export *exp, if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active) continue; - err = md_cancel_unused(lmv->tgts[i].ltd_exp, - fid, flags, opaque); + err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid, + policy, mode, flags, opaque); if (!rc) rc = err; } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index f390e3c..2b4122d 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -116,7 +116,9 @@ int mdc_enqueue(struct obd_export *exp, ldlm_completion_callback cb_completion, ldlm_blocking_callback cb_blocking, void *cb_data, int extra_lock_flags); - +int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, + struct list_head *cancels, ldlm_mode_t mode, + __u64 bits); /* mdc/mdc_request.c */ int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data); @@ -158,6 +160,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request); int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, + ldlm_policy_data_t *policy, ldlm_mode_t mode, int flags, void *opaque); int mdc_lock_match(struct obd_export *exp, int flags, const struct lu_fid *fid, ldlm_type_t type, diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 8f36d2c..11e5c58 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -128,7 +128,7 @@ void mdc_create_pack(struct ptlrpc_request *req, int offset, rec->cr_time = op_data->op_mod_time; rec->cr_suppgid1 = op_data->op_suppgids[0]; rec->cr_suppgid2 = op_data->op_suppgids[1]; - rec->cr_flags = op_data->op_flags; + rec->cr_flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS; rec->cr_bias = op_data->op_bias; mdc_pack_capa(req, offset + 1, op_data->op_capa1); @@ -251,7 +251,7 @@ static void mdc_epoch_pack(struct mdt_epoch *epoch, struct md_op_data *op_data) { memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle)); epoch->ioepoch = op_data->op_ioepoch; - epoch->flags = op_data->op_flags; + epoch->flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS; } void mdc_setattr_pack(struct ptlrpc_request *req, int offset, diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 797485f..8b2961d 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -167,7 +167,8 @@ int mdc_lock_match(struct obd_export *exp, int flags, int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, - int flags, void *opaque) + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int flags, void *opaque) { struct ldlm_res_id res_id = { .name = {fid_seq(fid), @@ -178,8 +179,8 @@ int mdc_cancel_unused(struct obd_export *exp, ENTRY; - rc = ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, - flags, opaque); + rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, + policy, mode, flags, opaque); RETURN(rc); } @@ -269,7 +270,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_reply *lockrep; int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*lockreq), - [DLM_INTENT_IT_OFF] = sizeof(*lit) }; + [DLM_INTENT_IT_OFF] = sizeof(*lit), + 0, 0, 0, 0, 0, 0 }; int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body), @@ -285,6 +287,9 @@ int mdc_enqueue(struct obd_export *exp, if (it->it_op & IT_OPEN) { int do_join = !!(it->it_flags & O_JOIN_FILE); + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; @@ -301,27 +306,46 @@ int mdc_enqueue(struct obd_export *exp, */ size[DLM_INTENT_REC_OFF + 4] = max(lmmsize, obddev->u.cli.cl_default_mds_easize); + + /* XXX: openlock is not cancelled for cross-refs. */ + /* If inode is known, cancel conflicting OPEN locks. */ + if (fid_is_sane(&op_data->op_fid2)) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; +#ifdef FMODE_EXEC + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; +#endif + else + mode = LCK_CR; + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } + + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, mode, + MDS_INODELOCK_UPDATE); + if (do_join) size[DLM_INTENT_REC_OFF + 5] = sizeof(struct mdt_rec_join); - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 8 + do_join, size, NULL); + req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels, + count); if (!req) RETURN(-ENOMEM); if (do_join) { - __u64 head_size = *(__u32*)cb_data; - __u32 tsize = *(__u32*)lmm; - /* join is like an unlink of the tail */ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - /* when joining file, cb_data and lmm args together - * indicate the head file size*/ mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data, - (head_size << 32) | tsize); - cb_data = NULL; - lmm = NULL; + (*(__u64 *)op_data->op_data)); } spin_lock(&req->rq_lock); @@ -350,8 +374,7 @@ int mdc_enqueue(struct obd_export *exp, sizeof(struct lustre_capa) : 0; size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 6, size, NULL); + req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -378,8 +401,7 @@ int mdc_enqueue(struct obd_export *exp, if (it->it_op & IT_GETATTR) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 6, size, NULL); + req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -398,8 +420,7 @@ int mdc_enqueue(struct obd_export *exp, repsize[repbufcnt++] = sizeof(struct lustre_capa); } else if (it->it_op == IT_READDIR) { policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (!req) RETURN(-ENOMEM); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 63bbb0c..f14c7f6 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -39,6 +39,7 @@ #include #include "mdc_internal.h" +#include /* mdc_setattr does its own semaphore handling */ static int mdc_reint(struct ptlrpc_request *request, @@ -62,6 +63,33 @@ static int mdc_reint(struct ptlrpc_request *request, return rc; } +/* Find and cancel locally locks matched by inode @bits & @mode in the resource + * found by @fid. Found locks are added into @cancel list. Returns the amount of + * locks added to @cancels list. */ +int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, + struct list_head *cancels, ldlm_mode_t mode, + __u64 bits) +{ + ldlm_policy_data_t policy = {{0}}; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + int count; + ENTRY; + + fid_build_reg_res_name(fid, &res_id); + res = ldlm_resource_get(exp->exp_obd->obd_namespace, + NULL, &res_id, 0, 0); + if (res == NULL) + RETURN(0); + + /* Initialize ibits lock policy. */ + policy.l_inodebits.bits = bits; + count = ldlm_cancel_resource_local(res, cancels, &policy, + mode, 0, 0, NULL); + ldlm_resource_putref(res); + RETURN(count); +} + /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that * should take the normal semaphore and go to the normal portal. * @@ -72,19 +100,21 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, void *ea, int ealen, void *ea2, int ea2len, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct ptlrpc_request *req; struct mdt_rec_setattr *rec; struct mdc_rpc_lock *rpc_lock; struct obd_device *obd = exp->exp_obd; - int size[6] = { sizeof(struct ptlrpc_body), - sizeof(*rec), 0, 0, ealen, ea2len }; - int bufcount = 4, rc; + int size[7] = { sizeof(struct ptlrpc_body), + sizeof(*rec), 0, 0, ealen, ea2len, 0 }; + int count = 0, bufcount = 4, rc; + __u64 bits; ENTRY; LASSERT(op_data != NULL); size[REQ_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; + sizeof(struct lustre_capa) : 0; if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) size[REQ_REC_OFF + 2] = sizeof(struct mdt_epoch); @@ -95,8 +125,24 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, bufcount++; } + bits = MDS_INODELOCK_UPDATE; + if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID)) + bits |= MDS_INODELOCK_LOOKUP; + if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && + (fid_is_sane(&op_data->op_fid1))) + count = mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_EX, bits); + if (exp_connect_cancelset(exp) && count) { + bufcount = 7; + size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); @@ -129,12 +175,14 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data, const void *data, int datalen, int mode, __u32 uid, __u32 gid, __u32 cap_effective, __u64 rdev, struct ptlrpc_request **request) { - int size[5] = { sizeof(struct ptlrpc_body), + int size[6] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_rec_create), - 0, op_data->op_namelen + 1 }; + 0, op_data->op_namelen + 1, 0, 0 }; struct obd_device *obd = exp->exp_obd; int level, bufcount = 4, rc; struct ptlrpc_request *req; + int count = 0; + CFS_LIST_HEAD(cancels); ENTRY; /* For case if upper layer did not alloc fid, do it now. */ @@ -158,8 +206,22 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data, bufcount++; } + if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && + (fid_is_sane(&op_data->op_fid1))) + count = mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if (exp_connect_cancelset(exp) && count) { + bufcount = 6; + size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); @@ -206,21 +268,41 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data, int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req = *request; - int size[4] = { sizeof(struct ptlrpc_body), + int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_rec_unlink), - 0, op_data->op_namelen + 1 }; - int rc; + 0, op_data->op_namelen + 1, 0 }; + int count = 0, rc, bufcount = 4; ENTRY; LASSERT(req == NULL); size[REQ_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; + sizeof(struct lustre_capa) : 0; + if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && + (fid_is_sane(&op_data->op_fid1))) + count = mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if ((op_data->op_flags & MF_MDC_CANCEL_FID3) && + (fid_is_sane(&op_data->op_fid3))) + count += mdc_resource_get_unused(exp, &op_data->op_fid3, + &cancels, LCK_EX, + MDS_INODELOCK_FULL); + if (exp_connect_cancelset(exp) && count) { + bufcount = 5; + size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 4, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); *request = req; @@ -241,12 +323,13 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, int mdc_link(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; - int size[5] = { sizeof(struct ptlrpc_body), + int size[6] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_rec_link), - 0, 0, op_data->op_namelen + 1 }; - int rc; + 0, 0, op_data->op_namelen + 1, 0 }; + int count = 0, rc, bufcount = 5; ENTRY; size[REQ_REC_OFF + 1] = op_data->op_capa1 ? @@ -254,8 +337,28 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data, size[REQ_REC_OFF + 2] = op_data->op_capa2 ? sizeof(struct lustre_capa) : 0; + if ((op_data->op_flags & MF_MDC_CANCEL_FID2) && + (fid_is_sane(&op_data->op_fid2))) + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && + (fid_is_sane(&op_data->op_fid1))) + count += mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if (exp_connect_cancelset(exp) && count) { + bufcount = 6; + size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT); + } + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 5, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); @@ -276,12 +379,13 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; - int size[6] = { sizeof(struct ptlrpc_body), + int size[7] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_rec_rename), - 0, 0, oldlen + 1, newlen + 1 }; - int rc; + 0, 0, oldlen + 1, newlen + 1, 0 }; + int count = 0, rc, bufcount = 6; ENTRY; size[REQ_REC_OFF + 1] = op_data->op_capa1 ? @@ -289,8 +393,37 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, size[REQ_REC_OFF + 2] = op_data->op_capa2 ? sizeof(struct lustre_capa) : 0; + if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && + (fid_is_sane(&op_data->op_fid1))) + count = mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if ((op_data->op_flags & MF_MDC_CANCEL_FID2) && + (fid_is_sane(&op_data->op_fid2))) + count += mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, LCK_EX, + MDS_INODELOCK_UPDATE); + if ((op_data->op_flags & MF_MDC_CANCEL_FID3) && + (fid_is_sane(&op_data->op_fid3))) + count += mdc_resource_get_unused(exp, &op_data->op_fid3, + &cancels, LCK_EX, + MDS_INODELOCK_LOOKUP); + if ((op_data->op_flags & MF_MDC_CANCEL_FID4) && + (fid_is_sane(&op_data->op_fid4))) + count += mdc_resource_get_unused(exp, &op_data->op_fid4, + &cancels, LCK_EX, + MDS_INODELOCK_FULL); + if (exp_connect_cancelset(exp) && count) { + bufcount = 7; + size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 6, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 20605d8..6ca08a3 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1487,6 +1487,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) GOTO(err_close_lock, rc); lprocfs_init_vars(mdc, &lvars); lprocfs_obd_setup(obd, lvars.obd_vars); + ptlrpc_lprocfs_register_obd(obd); rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL); if (rc) { diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 6d1a6e7..56f3a22 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2280,7 +2280,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, struct obd_export *exp = req->rq_export; struct ldlm_request *dlmreq = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); - struct lustre_handle remote_hdl = dlmreq->lock_handle1; + struct lustre_handle remote_hdl = dlmreq->lock_handle[0]; struct list_head *iter; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4aabd13..18bec0d 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2620,7 +2620,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, return; dlmreq = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ); - remote_hdl = dlmreq->lock_handle1; + remote_hdl = dlmreq->lock_handle[0]; spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index 01793a5..856c7c7 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -755,6 +755,18 @@ static int mdt_epoch_unpack(struct mdt_thread_info *info) RETURN(info->mti_epoch == NULL ? -EFAULT : 0); } +static inline int mdt_dlmreq_unpack(struct mdt_thread_info *info) { + struct req_capsule *pill = &info->mti_pill; + + if (req_capsule_get_size(pill, &RMF_DLM_REQ, RCL_CLIENT)) { + info->mti_dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ); + if (info->mti_dlm_req == NULL) + RETURN(-EFAULT); + } + + RETURN(0); +} + static int mdt_setattr_unpack(struct mdt_thread_info *info) { struct md_attr *ma = &info->mti_attr; @@ -769,23 +781,21 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info) /* Epoch may be absent */ mdt_epoch_unpack(info); - if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { + ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT); + if (ma->ma_lmm_size) { ma->ma_lmm = req_capsule_client_get(pill, &RMF_EADATA); - ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT); ma->ma_valid |= MA_LOV; } - if (req_capsule_field_present(pill, &RMF_LOGCOOKIES, RCL_CLIENT)) { - ma->ma_cookie = req_capsule_client_get(pill, - &RMF_LOGCOOKIES); - ma->ma_cookie_size = req_capsule_get_size(pill, - &RMF_LOGCOOKIES, - RCL_CLIENT); + ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES, + RCL_CLIENT); + if (ma->ma_cookie_size) { + ma->ma_cookie = req_capsule_client_get(pill, &RMF_LOGCOOKIES); ma->ma_valid |= MA_COOKIE; } - RETURN(0); + rc = mdt_dlmreq_unpack(info); + RETURN(rc); } int mdt_close_unpack(struct mdt_thread_info *info) @@ -808,6 +818,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info) struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; struct md_op_spec *sp = &info->mti_spec; + int rc; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_CREATE); @@ -875,19 +886,24 @@ static int mdt_create_unpack(struct mdt_thread_info *info) &RMF_EADATA, RCL_CLIENT); sp->u.sp_ea.fid = rr->rr_fid1; + RETURN(0); } + req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); } else if (S_ISLNK(attr->la_mode)) { const char *tgt = NULL; req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM); - if (req_capsule_field_present(pill, &RMF_SYMTGT, RCL_CLIENT)) { + if (req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT)) { tgt = req_capsule_client_get(pill, &RMF_SYMTGT); sp->u.sp_symname = tgt; } if (tgt == NULL) RETURN(-EFAULT); + } else { + req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); } - RETURN(0); + rc = mdt_dlmreq_unpack(info); + RETURN(rc); } static int mdt_link_unpack(struct mdt_thread_info *info) @@ -897,6 +913,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info) struct lu_attr *attr = &info->mti_attr.ma_attr; struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; + int rc; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_LINK); @@ -932,7 +949,8 @@ static int mdt_link_unpack(struct mdt_thread_info *info) info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT); info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF); - RETURN(0); + rc = mdt_dlmreq_unpack(info); + RETURN(rc); } static int mdt_unlink_unpack(struct mdt_thread_info *info) @@ -943,6 +961,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) struct lu_attr *attr = &info->mti_attr.ma_attr; struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; + int rc; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_UNLINK); @@ -980,7 +999,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) else ma->ma_attr_flags &= ~MDS_VTX_BYPASS; - RETURN(0); + rc = mdt_dlmreq_unpack(info); + RETURN(rc); } static int mdt_rename_unpack(struct mdt_thread_info *info) @@ -991,6 +1011,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) struct lu_attr *attr = &info->mti_attr.ma_attr; struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; + int rc; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_RENAME); @@ -1035,7 +1056,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) else ma->ma_attr_flags &= ~MDS_VTX_BYPASS; - RETURN(0); + rc = mdt_dlmreq_unpack(info); + RETURN(rc); } static int mdt_open_unpack(struct mdt_thread_info *info) @@ -1046,6 +1068,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info) struct req_capsule *pill = &info->mti_pill; struct mdt_reint_record *rr = &info->mti_rr; struct ptlrpc_request *req = mdt_info_req(info); + struct md_op_spec *sp = &info->mti_spec; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_CREATE); @@ -1101,13 +1124,10 @@ static int mdt_open_unpack(struct mdt_thread_info *info) rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; LASSERT(rr->rr_namelen > 0); - if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { - struct md_op_spec *sp = &info->mti_spec; - sp->u.sp_ea.eadata = req_capsule_client_get(pill, - &RMF_EADATA); - sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, - &RMF_EADATA, - RCL_CLIENT); + sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT); + if (sp->u.sp_ea.eadatalen) { + sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) sp->u.sp_ea.no_lov_create = 1; } diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index ec8298e..34466b8 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -294,6 +294,9 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), (unsigned int)ma->ma_attr.la_valid); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0); + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(mo)) @@ -408,6 +411,9 @@ static int mdt_reint_create(struct mdt_thread_info *info, if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) GOTO(out, rc = err_serious(-ESTALE)); + if (info->mti_dlm_req) + ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); + switch (info->mti_attr.ma_attr.la_mode & S_IFMT) { case S_IFDIR:{ /* Cross-ref case. */ @@ -453,6 +459,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1), rr->rr_name); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) GOTO(out, rc = err_serious(-ENOENT)); @@ -556,6 +565,9 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) GOTO(out, rc = err_serious(-ENOENT)); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_cross_ref) { /* MDT holding name ask us to add ref. */ lhs = &info->mti_lh[MDT_LH_CHILD]; @@ -813,6 +825,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info, int rc; ENTRY; + if (info->mti_dlm_req) + ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); + if (info->mti_cross_ref) { rc = mdt_reint_rename_tgt(info); RETURN(rc); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6ff5ab8..88c46c5 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -573,6 +573,28 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, return rc; } +/* Find and cancel locally locks matched by @mode in the resource found by + * @objid. Found locks are added into @cancel list. Returns the amount of + * locks added to @cancels list. */ +static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, + struct list_head *cancels, ldlm_mode_t mode, + int lock_flags) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } }; + struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + int count; + ENTRY; + + if (res == NULL) + RETURN(0); + + count = ldlm_cancel_resource_local(res, cancels, NULL, mode, + lock_flags, 0, NULL); + ldlm_resource_putref(res); + RETURN(count); +} + /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -587,9 +609,11 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti, struct obd_export *md_export) { + CFS_LIST_HEAD(cancels); struct ptlrpc_request *req; struct ost_body *body; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 }; + int count, bufcount = 2; ENTRY; if (!oa) { @@ -597,8 +621,19 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } + count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, + LDLM_FL_DISCARD_DATA); + if (exp_connect_cancelset(exp) && count) { + bufcount = 3; + size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_DESTROY, 2, size, NULL); + OST_DESTROY, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (!req) RETURN(-ENOMEM); @@ -2865,10 +2900,10 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, if (intent) { int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) }; + [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), + [DLM_LOCKREQ_OFF + 1] = 0 }; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) RETURN(-ENOMEM); @@ -2987,8 +3022,7 @@ static int osc_cancel_unused(struct obd_export *exp, resp = &res_id; } - return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, - opaque); + return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } static int osc_join_lru(struct obd_export *exp, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 2b14646..888ce73 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -92,6 +92,15 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, if (body == NULL) RETURN(-EFAULT); + if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) { + struct ldlm_request *dlm; + dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm), + lustre_swab_ldlm_request); + if (dlm == NULL) + RETURN (-EFAULT); + ldlm_request_cancel(req, dlm, 0); + } + rc = lustre_pack_reply(req, 2, size, NULL); if (rc) RETURN(rc); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index e7af9cf..d014180 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -137,7 +137,8 @@ static const struct req_msg_field *mds_reint_create_rmt_acl_client[] = { &RMF_REC_CREATE, &RMF_CAPA1, &RMF_NAME, - &RMF_EADATA + &RMF_EADATA, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_reint_create_sym_client[] = { @@ -145,7 +146,8 @@ static const struct req_msg_field *mds_reint_create_sym_client[] = { &RMF_REC_CREATE, &RMF_CAPA1, &RMF_NAME, - &RMF_SYMTGT + &RMF_SYMTGT, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_reint_create_slave_client[] = { @@ -178,7 +180,8 @@ static const struct req_msg_field *mds_reint_unlink_client[] = { &RMF_PTLRPC_BODY, &RMF_REC_UNLINK, &RMF_CAPA1, - &RMF_NAME + &RMF_NAME, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_reint_link_client[] = { @@ -186,7 +189,8 @@ static const struct req_msg_field *mds_reint_link_client[] = { &RMF_REC_LINK, &RMF_CAPA1, &RMF_CAPA2, - &RMF_NAME + &RMF_NAME, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_reint_rename_client[] = { @@ -195,7 +199,8 @@ static const struct req_msg_field *mds_reint_rename_client[] = { &RMF_CAPA1, &RMF_CAPA2, &RMF_NAME, - &RMF_SYMTGT + &RMF_SYMTGT, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_last_unlink_server[] = { @@ -211,7 +216,8 @@ static const struct req_msg_field *mds_reint_setattr_client[] = { &RMF_CAPA1, &RMF_MDT_EPOCH, &RMF_EADATA, - &RMF_LOGCOOKIES + &RMF_LOGCOOKIES, + &RMF_DLM_REQ }; static const struct req_msg_field *mds_connect_client[] = { @@ -925,6 +931,7 @@ static void *__req_capsule_get(struct req_capsule *pill, field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name, lustre_msg_buflen(msg, offset), field->rmf_size, rcl_names[loc]); + return value; } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index ab389d0..08d5192 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2173,10 +2173,9 @@ void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l) void lustre_swab_ldlm_request (struct ldlm_request *rq) { __swab32s (&rq->lock_flags); - CLASSERT(offsetof(typeof(*rq), lock_padding) != 0); lustre_swab_ldlm_lock_desc (&rq->lock_desc); - /* lock_handle1 opaque */ - /* lock_handle2 opaque */ + __swab32s (&rq->lock_count); + /* lock_handle[] opaque */ } void lustre_swab_ldlm_reply (struct ldlm_reply *r) diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index e20e618..3d3e732f 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -469,6 +469,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL); CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", @@ -1474,22 +1475,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_request, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_count)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count)); LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", (long long)(int)offsetof(struct ldlm_request, lock_desc)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle1)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle2)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2)); + LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_handle)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle)); /* Checks for struct ldlm_reply */ LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n", @@ -1498,14 +1495,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_reply, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); - LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_desc)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_padding)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_desc)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc)); LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n", (long long)(int)offsetof(struct ldlm_reply, lock_handle)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n", diff --git a/lustre/tests/sanity-lmv.sh b/lustre/tests/sanity-lmv.sh index 0c76144..fecae0e 100644 --- a/lustre/tests/sanity-lmv.sh +++ b/lustre/tests/sanity-lmv.sh @@ -381,6 +381,52 @@ test_4a() { ## this test is very time-consuming, don't run it by default #run_test 4a " FIDS/ nlink overflow test =============================" +test_5a() { + mount_client $MOUNT2 + # create a cross-ref file + mkdir -p $MOUNT/$tdir/d1 + mkdir -p $MOUNT2/$tdir/d2 + dd if=/dev/zero of=$MOUNT/$tdir/d1/f1 count=1 + mv $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2/ + # XXX: a check the file is a cross-ref one is needed. + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$MOUNT2/$tdir/d2/f1 of=/dev/null + stat $MOUNT2/$tdir/d2 $MOUNT2/$tdir/d2/f1 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats` + unlink $MOUNT2/$tdir/d2/f1 + can2=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats` + umount $MOUNT2 + [ $can1 -eq $can2 ] && error "It does not look like a cross-ref file." + [ $[$can1+1] -eq $can2 ] || error $[$[$can2-$can1]] "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $[$[$blk2-$blk1]] "blocking RPC occured." +} +run_test 5a "Early Lock Cancel: cross-ref unlink" + +test_5b() { + mount_client $MOUNT2 + # create a cross-ref file + mkdir -p $MOUNT/$tdir/d1 + mkdir -p $MOUNT2/$tdir/d2 + dd if=/dev/zero of=$MOUNT/$tdir/d1/f1 count=1 + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$MOUNT2/$tdir/d1/f1 of=/dev/null + stat $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats` + ln $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2/f2 + can2=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats` + umount $MOUNT2 + [ $can1 -eq $can2 ] && error "It does not look like a cross-ref file." + [ $[$can1+1] -eq $can2 ] || error $[$[$can2-$can1]] "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $[$[$blk2-$blk1]] "blocking RPC occured." +} +run_test 5b "Early Lock Cancel: cross-ref link" + TMPDIR=$OLDTMPDIR TMP=$OLDTMP HOME=$OLDHOME diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 53efa0b..06326a8 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -3983,6 +3983,128 @@ test_119b() # bug 11737 } run_test 119b "Sparse directIO read must return actual read amount" +test_119a() { + mkdir $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + mkdir $DIR/$tdir/d1 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119a "Early Lock Cancel: mkdir test" + +test_119b() { + mkdir $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + touch $DIR/$tdir/f1 + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119b "Early Lock Cancel: create test" + +test_119c() { + mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 + touch $DIR/$tdir/d1/f1 + cancel_lru_locks mdc + stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + ln $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119c "Early Lock Cancel: link test" + +test_119d() { + touch $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + chmod a+x $DIR/$tdir + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119d "Early Lock Cancel: setattr test" + +test_119e() { + mkdir $DIR/$tdir + dd if=/dev/zero of=$DIR/$tdir/f1 count=1 + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$DIR/$tdir/f1 of=/dev/null + stat $DIR/$tdir $DIR/$tdir/f1 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + unlink $DIR/$tdir/f1 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119e "Early Lock Cancel: unlink test" + +test_119f() { + mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 + dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1 + dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1 + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$DIR/$tdir/d1/f1 of=/dev/null + dd if=$DIR/$tdir/d2/f2 of=/dev/null + stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + mv $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 119f "Early Lock Cancel: rename test" + +test_119g() { + count=10000 + echo create $count files + mkdir $DIR/$tdir + cancel_lru_locks mdc + cancel_lru_locks osc + t0=`date +%s` + + can0=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk0=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + createmany -o $DIR/$tdir/f $count + sync + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + t1=`date +%s` + echo total: $((can1-can0)) cancels, $((blk1-blk0)) blockings + echo rm $count files + rm -r $DIR/$tdir + sync + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + t2=`date +%s` + echo total: $count removes in $((t2-t1)) + echo total: $((can2-can1)) cancels, $((blk2-blk1)) blockings + sleep 2 + # wait for commitment of removal +} +run_test 119g "Early Lock Cancel: performance test" + TMPDIR=$OLDTMPDIR TMP=$OLDTMP HOME=$OLDHOME diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 5fe8169..4fd177b 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -180,6 +180,7 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA); CHECK_CDEFINE(OBD_CONNECT_MDS_MDS); CHECK_CDEFINE(OBD_CONNECT_SOM); + CHECK_CDEFINE(OBD_CONNECT_CANCELSET); } static void @@ -664,10 +665,9 @@ check_ldlm_request(void) BLANK_LINE(); CHECK_STRUCT(ldlm_request); CHECK_MEMBER(ldlm_request, lock_flags); - CHECK_MEMBER(ldlm_request, lock_padding); + CHECK_MEMBER(ldlm_request, lock_count); CHECK_MEMBER(ldlm_request, lock_desc); - CHECK_MEMBER(ldlm_request, lock_handle1); - CHECK_MEMBER(ldlm_request, lock_handle2); + CHECK_MEMBER(ldlm_request, lock_handle); } static void @@ -676,8 +676,8 @@ check_ldlm_reply(void) BLANK_LINE(); CHECK_STRUCT(ldlm_reply); CHECK_MEMBER(ldlm_reply, lock_flags); - CHECK_MEMBER(ldlm_request, lock_padding); - CHECK_MEMBER(ldlm_request, lock_desc); + CHECK_MEMBER(ldlm_reply, lock_padding); + CHECK_MEMBER(ldlm_reply, lock_desc); CHECK_MEMBER(ldlm_reply, lock_handle); CHECK_MEMBER(ldlm_reply, lock_policy_res1); CHECK_MEMBER(ldlm_reply, lock_policy_res2); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 45b5c95..66065c3 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -485,6 +485,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL); CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", @@ -1490,22 +1491,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_request, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_count)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count)); LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", (long long)(int)offsetof(struct ldlm_request, lock_desc)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle1)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle2)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2)); + LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_handle)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle)); /* Checks for struct ldlm_reply */ LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n", @@ -1514,14 +1511,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_reply, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); - LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_desc)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_padding)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_desc)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc)); LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n", (long long)(int)offsetof(struct ldlm_reply, lock_handle)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n", -- 1.8.3.1