From 5516349042147d5553e5861a6f61739771d5f90d Mon Sep 17 00:00:00 2001 From: vitaly Date: Thu, 21 Jun 2007 00:29:07 +0000 Subject: [PATCH] Land b1_6_elc onto b1_6 (20070621_0218) land Early Lock Cancel code --- lustre/ChangeLog | 7 + lustre/include/lustre/lustre_idl.h | 29 +- lustre/include/lustre_dlm.h | 85 ++++-- lustre/include/lustre_mds.h | 22 +- lustre/include/obd.h | 25 +- lustre/include/obd_class.h | 20 +- lustre/include/obd_ost.h | 2 +- lustre/include/obd_support.h | 2 + lustre/ldlm/ldlm_extent.c | 5 +- lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_inodebits.c | 2 - lustre/ldlm/ldlm_internal.h | 10 +- lustre/ldlm/ldlm_lock.c | 50 +--- lustre/ldlm/ldlm_lockd.c | 174 +++++++---- lustre/ldlm/ldlm_plain.c | 2 - lustre/ldlm/ldlm_request.c | 585 ++++++++++++++++++++++++++++--------- lustre/ldlm/ldlm_resource.c | 46 ++- lustre/liblustre/dir.c | 8 +- lustre/liblustre/file.c | 1 + lustre/liblustre/rw.c | 10 +- lustre/liblustre/super.c | 31 +- lustre/llite/dcache.c | 4 +- lustre/llite/dir.c | 12 +- lustre/llite/file.c | 50 ++-- lustre/llite/llite_internal.h | 2 +- lustre/llite/llite_lib.c | 10 +- lustre/llite/namei.c | 73 +++-- lustre/lov/lov_internal.h | 7 +- lustre/lov/lov_obd.c | 19 +- lustre/lov/lov_request.c | 14 +- lustre/mdc/mdc_locks.c | 119 ++++---- lustre/mdc/mdc_reint.c | 153 ++++++++-- lustre/mds/handler.c | 2 +- lustre/mds/mds_lib.c | 55 +++- lustre/mds/mds_reint.c | 15 + lustre/mgc/mgc_request.c | 9 +- lustre/obdecho/echo_client.c | 4 +- lustre/obdfilter/filter.c | 12 +- lustre/osc/osc_request.c | 83 ++++-- lustre/ost/ost_handler.c | 9 + lustre/ptlrpc/client.c | 10 +- lustre/ptlrpc/pack_generic.c | 5 +- lustre/ptlrpc/wiretest.c | 41 ++- lustre/tests/recovery-small.sh | 12 + lustre/tests/sanity.sh | 133 +++++++++ lustre/utils/wirecheck.c | 12 +- lustre/utils/wiretest.c | 41 ++- 47 files changed, 1422 insertions(+), 602 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 13feaee..3c1e444 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -318,6 +318,13 @@ Description: Short directio read returns full requested size rather than Details : Direct I/O operations should return actual amount of bytes transferred rather than requested size. +Severity : enhancement +Bugzilla : 10589 +Description: metadata RPC reduction (e.g. for rm performance) +Details : decrease the amount of synchronous RPC between clients and servers + by canceling conflicing lock before the operation on the client side + and packing thier handles into the main operation RPC to server. + -------------------------------------------------------------------------------- 2007-05-03 Cluster File Systems, Inc. diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 980f588..e19088b 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -283,17 +283,20 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_CONNECT_QUOTA64 0x80000ULL /* 64bit qunit_data.qd_count b=10707*/ #define OBD_CONNECT_FID_CAPA 0x100000ULL /* fid capability */ #define OBD_CONNECT_OSS_CAPA 0x200000ULL /* OSS capability */ +#define OBD_CONNECT_CANCELSET 0x400000ULL /* Early batched cancels. */ /* also update obd_connect_names[] for lprocfs_rd_connect_flags() * and lustre/utils/wirecheck.c */ #define MDS_CONNECT_SUPPORTED (OBD_CONNECT_RDONLY | OBD_CONNECT_VERSION | \ OBD_CONNECT_ACL | OBD_CONNECT_XATTR | \ OBD_CONNECT_IBITS | OBD_CONNECT_JOIN | \ - OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID) + OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | \ + OBD_CONNECT_CANCELSET) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ - OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64) + OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \ + OBD_CONNECT_CANCELSET) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION) @@ -306,6 +309,9 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_OCD_VERSION_PATCH(version) ((int)((version)>>8)&255) #define OBD_OCD_VERSION_FIX(version) ((int)(version)&255) +#define exp_connect_cancelset(exp) \ + ((exp) ? (exp)->exp_connect_flags & OBD_CONNECT_CANCELSET : 0) + /* This structure is used for both request and reply. * * If we eventually have separate connect data for different types, which we @@ -1046,14 +1052,27 @@ struct ldlm_lock_desc { extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l); +#define LDLM_LOCKREQ_HANDLES 2 +#define LDLM_ENQUEUE_CANCEL_OFF 1 + struct ldlm_request { __u32 lock_flags; - __u32 lock_padding; /* also fix lustre_swab_ldlm_request */ + __u32 lock_count; struct ldlm_lock_desc lock_desc; - struct lustre_handle lock_handle1; - struct lustre_handle lock_handle2; + struct lustre_handle lock_handle[LDLM_LOCKREQ_HANDLES]; }; +/* If LDLM_ENQUEUE, 1 slot is already occupied, 1 is available. + * Otherwise, 2 are available. */ +#define ldlm_request_bufsize(count,type) \ +({ \ + int _avail = LDLM_LOCKREQ_HANDLES; \ + _avail -= (type == LDLM_ENQUEUE ? LDLM_ENQUEUE_CANCEL_OFF : 0); \ + sizeof(struct ldlm_request) + \ + (count - _avail > 0 ? count - _avail : 0) * \ + sizeof(struct lustre_handle); \ +}) + extern void lustre_swab_ldlm_request (struct ldlm_request *rq); struct ldlm_reply { diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 61dfc23..9c3ccca 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -28,6 +28,7 @@ struct obd_device; #define OBD_LDLM_DEVICENAME "ldlm" #define LDLM_DEFAULT_LRU_SIZE (100 * smp_num_cpus) +#define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000)) typedef enum { ELDLM_OK = 0, @@ -131,6 +132,18 @@ typedef enum { #define LDLM_FL_LOCK_PROTECT 0x8000000 #define LDLM_FL_LOCK_PROTECT_BIT 27 +/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir, + * such that server send blocking ast for conflict locks to this client for + * the 1st operation, whereas the 2nd operation has canceled this lock and + * is waiting for rpc_lock which is taken by the 1st operation. + * LDLM_FL_BL_AST is to be set by ldlm_callback_handler() to the lock not allow + * ELC code to cancel it. + * LDLM_FL_BL_DONE is to be set by ldlm_cancel_callback() when lock cache is + * droped to let ldlm_callback_handler() return EINVAL to the server. It is + * used when ELC rpc is already prepared and is waiting for rpc_lock, too late + * to send a separate CANCEL rpc. */ +#define LDLM_FL_BL_AST 0x10000000 +#define LDLM_FL_BL_DONE 0x20000000 /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -149,7 +162,7 @@ typedef enum { #define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR) #define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW) #define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW) -#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX) +#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX | LCK_GROUP) #define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL) extern ldlm_mode_t lck_compat_array[]; @@ -226,6 +239,7 @@ struct ldlm_namespace { spinlock_t ns_unused_lock; unsigned int ns_max_unused; + unsigned int ns_max_age; cfs_time_t ns_next_dump; /* next debug dump, jiffies */ atomic_t ns_locks; @@ -262,13 +276,6 @@ struct ldlm_lock { /* ldlm_lock_change_resource() can change this */ struct ldlm_resource *l_resource; - /* set once, no need to protect it */ - struct ldlm_lock *l_parent; - - /* protected by ns_hash_lock */ - struct list_head l_children; - struct list_head l_childof; - /* protected by ns_hash_lock. FIXME */ struct list_head l_lru; @@ -327,8 +334,6 @@ struct ldlm_lock { __u32 l_pid; /* pid which created this lock */ __u32 l_pidb; /* who holds LOCK_PROTECT_BIT */ - struct list_head l_tmp; - /* for ldlm_add_ast_work_item() */ struct list_head l_bl_ast; struct list_head l_cp_ast; @@ -360,10 +365,6 @@ struct ldlm_resource { struct semaphore lr_lvb_sem; __u32 lr_lvb_len; void *lr_lvb_data; - - /* lr_tmp holds a list head temporarily, during the building of a work - * queue. see ldlm_add_ast_work_item and ldlm_run_ast_work */ - void *lr_tmp; }; struct ldlm_ast_work { @@ -376,6 +377,16 @@ struct ldlm_ast_work { int w_datalen; }; +/* ldlm_enqueue parameters common */ +struct ldlm_enqueue_info { + __u32 ei_type; /* Type of the lock being enqueued. */ + __u32 ei_mode; /* Mode of the lock being enqueued. */ + void *ei_cb_bl; /* Different callbacks for lock handling (blocking, */ + void *ei_cb_cp; /* completion, glimpse) */ + void *ei_cb_gl; + void *ei_cbdata; /* Data to be passed into callbacks. */ +}; + extern struct obd_ops ldlm_obd_ops; extern char *ldlm_lockname[]; @@ -459,6 +470,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, ldlm_blocking_callback, ldlm_glimpse_callback); int ldlm_handle_convert(struct ptlrpc_request *req); int ldlm_handle_cancel(struct ptlrpc_request *req); +int ldlm_request_cancel(struct ptlrpc_request *req, + struct ldlm_request *dlm_req, int first); int ldlm_del_waiting_lock(struct ldlm_lock *lock); int ldlm_refresh_waiting_lock(struct ldlm_lock *lock); int ldlm_get_ref(void); @@ -480,6 +493,18 @@ static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h) return __ldlm_handle2lock(h, 0); } +static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, + struct lustre_msg *m, int buf_idx, + int increase) +{ + if (res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + return res->lr_namespace->ns_lvbo->lvbo_update(res, m, buf_idx, + increase); + } + return 0; +} + #define LDLM_LOCK_PUT(lock) \ do { \ /*LDLM_DEBUG((lock), "put");*/ \ @@ -493,6 +518,18 @@ do { \ lock; \ }) +#define ldlm_lock_list_put(head, member, count) \ +({ \ + struct ldlm_lock *_lock, *_next; \ + int c = count; \ + list_for_each_entry_safe(_lock, _next, head, member) { \ + list_del_init(&_lock->member); \ + LDLM_LOCK_PUT(_lock); \ + if (--c == 0) \ + break; \ + } \ +}) + struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock); void ldlm_lock_put(struct ldlm_lock *lock); void ldlm_lock_destroy(struct ldlm_lock *lock); @@ -549,13 +586,13 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp); int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **req, - struct ldlm_res_id res_id, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, void *lvb, __u32 lvb_len, void *lvb_swabber, + struct ldlm_enqueue_info *einfo, struct ldlm_res_id res_id, + ldlm_policy_data_t *policy, int *flags, + void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async); +struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, + int bufcount, int *size, + struct list_head *head, int count); int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, int *flags, void *lvb, __u32 lvb_len, @@ -575,7 +612,15 @@ int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct lustre_handle *lockh); int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *, int flags, void *opaque); +int ldlm_cli_cancel_req(struct obd_export *exp, + struct list_head *head, int count); int ldlm_cli_join_lru(struct ldlm_namespace *, struct ldlm_res_id *, int join); +int ldlm_cancel_resource_local(struct ldlm_resource *res, + struct list_head *cancels, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + int lock_flags, int flags, void *opaque); +int ldlm_cli_cancel_list(struct list_head *head, int count, + struct ptlrpc_request *req, int off); /* mds/handler.c */ /* This has to be here because recursive inclusion sucks. */ diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index 85da7a4..981e444 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -48,11 +48,14 @@ struct lustre_md { struct mdc_op_data { struct ll_fid fid1; struct ll_fid fid2; + struct ll_fid fid3; + struct ll_fid fid4; __u64 mod_time; const char *name; int namelen; __u32 create_mode; __u32 suppgids[2]; + void *data; }; struct mds_update_record { @@ -74,6 +77,7 @@ struct mds_update_record { __u32 ur_mode; __u32 ur_flags; struct lvfs_grp_hash_entry *ur_grp_entry; + struct ldlm_request *ur_dlm; }; /* file data for open files on MDS */ @@ -122,17 +126,10 @@ int mdc_intent_lock(struct obd_export *exp, struct lookup_intent *, int, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking, int extra_lock_flags); -int mdc_enqueue(struct obd_export *exp, - int lock_type, - struct lookup_intent *it, - int lock_mode, - struct mdc_op_data *data, - struct lustre_handle *lockh, - void *lmm, - int lmmlen, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags); +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct mdc_op_data *data, + struct lustre_handle *lockh, void *lmm, int lmmlen, + int extra_lock_flags); /* mdc/mdc_request.c */ int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp); @@ -182,6 +179,9 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *data, int mdc_sync(struct obd_export *exp, struct ll_fid *fid, struct ptlrpc_request **); int mdc_create_client(struct obd_uuid uuid, struct ptlrpc_client *cl); +int mdc_resource_get_unused(struct obd_export *exp, struct ll_fid *fid, + struct list_head *cancels, ldlm_mode_t mode, + __u64 bits); /* Store the generation of a newly-created inode in |req| for replay. */ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 85073a3..af3f04b 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -129,31 +129,15 @@ struct obd_info; typedef int (*obd_enqueue_update_f)(struct obd_info *oinfo, int rc); -/* obd_enqueue parameters common for all levels (lov, osc). */ -struct obd_enqueue_info { - /* Flags used while lock handling. */ - int ei_flags; - /* Type of the lock being enqueued. */ - __u32 ei_type; - /* Mode of the lock being enqueued. */ - __u32 ei_mode; - /* Different callbacks for lock handling (blocking, completion, - glimpse */ - void *ei_cb_bl; - void *ei_cb_cp; - void *ei_cb_gl; - /* Data to be passed into callbacks. */ - void *ei_cbdata; - /* Request set for OSC async requests. */ - struct ptlrpc_request_set *ei_rqset; -}; - /* obd info for a particular level (lov, osc). */ struct obd_info { /* Lock policy. It keeps an extent which is specific for a particular * OSC. (e.g. lov_prep_enqueue_set initialises extent of the policy, * and osc_enqueue passes it into ldlm_lock_match & ldlm_cli_enqueue. */ ldlm_policy_data_t oi_policy; + /* Flags used while lock handling. The flags obtained on the enqueue + * request are set here, therefore they are request specific. */ + int oi_flags; /* Lock handle specific for every OSC lock. */ struct lustre_handle *oi_lockh; /* lsm data specific for every OSC. */ @@ -939,7 +923,8 @@ struct obd_ops { int niocount, struct niobuf_local *local, struct obd_trans_info *oti, int rc); int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo, - struct obd_enqueue_info *einfo); + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset); int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type, ldlm_policy_data_t *, __u32 mode, int *flags, void *data, struct lustre_handle *lockh); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index f77c399..d8efd12 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1115,30 +1115,30 @@ static inline int obd_iocontrol(unsigned int cmd, struct obd_export *exp, static inline int obd_enqueue_rqset(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo) { + struct ptlrpc_request_set *set = NULL; int rc; ENTRY; EXP_CHECK_OP(exp, enqueue); EXP_COUNTER_INCREMENT(exp, enqueue); - einfo->ei_rqset = ptlrpc_prep_set(); - if (einfo->ei_rqset == NULL) + set = ptlrpc_prep_set(); + if (set == NULL) RETURN(-ENOMEM); - rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo); + rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set); if (rc == 0) - rc = ptlrpc_set_wait(einfo->ei_rqset); - ptlrpc_set_destroy(einfo->ei_rqset); - einfo->ei_rqset = NULL; - + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); RETURN(rc); } static inline int obd_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *set) { int rc; ENTRY; @@ -1146,7 +1146,7 @@ static inline int obd_enqueue(struct obd_export *exp, EXP_CHECK_OP(exp, enqueue); EXP_COUNTER_INCREMENT(exp, enqueue); - rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo); + rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set); RETURN(rc); } diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 12ea558..d9accb5 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -31,7 +31,7 @@ struct osc_async_args { struct osc_enqueue_args { struct obd_export *oa_exp; struct obd_info *oa_oi; - struct obd_enqueue_info *oa_ei; + struct ldlm_enqueue_info*oa_ei; }; #endif diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 04c6509..f4a3007 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -151,6 +151,8 @@ extern int obd_race_state; #define OBD_FAIL_LDLM_RECOV_CLIENTS 0x30d #define OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT 0x30e #define OBD_FAIL_LDLM_GLIMPSE 0x30f +#define OBD_FAIL_LDLM_CANCEL_RACE 0x310 +#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311 #define OBD_FAIL_OSC 0x400 #define OBD_FAIL_OSC_BRW_READ_BULK 0x401 diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index ab4e8a4..7f299b4 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -347,12 +347,10 @@ destroylock: /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the ns lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, struct list_head *work_list) @@ -384,7 +382,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); - ldlm_extent_policy(res, lock, flags); + if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) + ldlm_extent_policy(res, lock, flags); ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index ec2e76f..80f87e0 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -334,7 +334,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ - new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK, + new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); if (!new2) { diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index d8e7c3b..c378c28 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -112,12 +112,10 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the ns lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index fbc9c18..1b9ac7d 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -8,7 +8,12 @@ typedef enum { LDLM_SYNC, } ldlm_sync_t; +/* Cancel lru flag, it indicates we cancel aged locks. */ +#define LDLM_CANCEL_AGED 0x00000001 + int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, + int count, int max, int flags); /* ldlm_resource.c */ int ldlm_resource_putref_locked(struct ldlm_resource *res); @@ -18,8 +23,7 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, /* ldlm_lock.c */ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); struct ldlm_lock * -ldlm_lock_create(struct ldlm_namespace *ns, - struct lustre_handle *parent_lock_handle, struct ldlm_res_id, +ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id, ldlm_type_t type, ldlm_mode_t, ldlm_blocking_callback, ldlm_completion_callback, ldlm_glimpse_callback, void *data, __u32 lvb_len); @@ -38,7 +42,7 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock); + struct ldlm_lock *lock, int flags); void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 7e43e3f..12995ee 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -150,9 +150,6 @@ void ldlm_lock_put(struct ldlm_lock *lock) LASSERT(list_empty(&lock->l_res_link)); LASSERT(list_empty(&lock->l_pending_chain)); - if (lock->l_parent) - LDLM_LOCK_PUT(lock->l_parent); - atomic_dec(&res->lr_namespace->ns_locks); ldlm_resource_putref(res); lock->l_resource = NULL; @@ -204,12 +201,6 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock) { ENTRY; - if (!list_empty(&lock->l_children)) { - LDLM_ERROR(lock, "still has children (%p)!", - lock->l_children.next); - ldlm_lock_dump(D_ERROR, lock, 0); - LBUG(); - } if (lock->l_readers || lock->l_writers) { LDLM_ERROR(lock, "lock still has references"); ldlm_lock_dump(D_ERROR, lock, 0); @@ -289,8 +280,7 @@ static void lock_handle_addref(void *lock) * after return, ldlm_*_put the resource and parent * returns: lock with refcount 2 - one for current caller and one for remote */ -static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, - struct ldlm_resource *resource) +static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) { struct ldlm_lock *lock; ENTRY; @@ -305,12 +295,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, lock->l_resource = ldlm_resource_getref(resource); atomic_set(&lock->l_refc, 2); - CFS_INIT_LIST_HEAD(&lock->l_children); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); CFS_INIT_LIST_HEAD(&lock->l_export_chain); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); - CFS_INIT_LIST_HEAD(&lock->l_tmp); CFS_INIT_LIST_HEAD(&lock->l_bl_ast); CFS_INIT_LIST_HEAD(&lock->l_cp_ast); cfs_waitq_init(&lock->l_waitq); @@ -322,14 +310,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, lock->l_sl_policy.next = NULL; atomic_inc(&resource->lr_namespace->ns_locks); - - if (parent != NULL) { - spin_lock(&resource->lr_namespace->ns_hash_lock); - lock->l_parent = LDLM_LOCK_GET(parent); - list_add(&lock->l_childof, &parent->l_children); - spin_unlock(&resource->lr_namespace->ns_hash_lock); - } - CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, lock_handle_addref); @@ -606,7 +586,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread(ns, NULL, lock) != 0) + ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && @@ -615,12 +595,16 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) * reference, put it on the LRU. */ LASSERT(list_empty(&lock->l_lru)); LASSERT(ns->ns_nr_unused >= 0); + lock->l_last_used = cfs_time_current(); spin_lock(&ns->ns_unused_lock); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; spin_unlock(&ns->ns_unused_lock); unlock_res_and_lock(lock); - ldlm_cancel_lru(ns, LDLM_ASYNC); + /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported + * by the server, otherwise, it is done on enqueue. */ + if (!exp_connect_cancelset(lock->l_conn_export)) + ldlm_cancel_lru(ns, LDLM_ASYNC); } else { unlock_res_and_lock(lock); } @@ -1068,7 +1052,6 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, - struct lustre_handle *parent_lock_handle, struct ldlm_res_id res_id, ldlm_type_t type, ldlm_mode_t mode, ldlm_blocking_callback blocking, @@ -1076,24 +1059,16 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len) { - struct ldlm_resource *res, *parent_res = NULL; - struct ldlm_lock *lock, *parent_lock = NULL; + struct ldlm_lock *lock; + struct ldlm_resource *res; ENTRY; - if (parent_lock_handle) { - parent_lock = ldlm_handle2lock(parent_lock_handle); - if (parent_lock) - parent_res = parent_lock->l_resource; - } - - res = ldlm_resource_get(ns, parent_res, res_id, type, 1); + res = ldlm_resource_get(ns, NULL, res_id, type, 1); if (res == NULL) RETURN(NULL); - lock = ldlm_lock_new(parent_lock, res); + lock = ldlm_lock_new(res); ldlm_resource_putref(res); - if (parent_lock != NULL) - LDLM_LOCK_PUT(parent_lock); if (lock == NULL) RETURN(NULL); @@ -1395,6 +1370,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock) LDLM_DEBUG(lock, "no blocking ast"); } } + lock->l_flags |= LDLM_FL_BL_DONE; } void ldlm_unlink_lock_skiplist(struct ldlm_lock *req) @@ -1510,6 +1486,8 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp) spin_unlock(&exp->exp_ldlm_data.led_lock); LDLM_DEBUG(lock, "export %p", exp); + ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_lock_cancel(lock); ldlm_reprocess_all(res); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index fe9add7..135c2bd 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -98,6 +98,7 @@ struct ldlm_bl_work_item { struct ldlm_namespace *blwi_ns; struct ldlm_lock_desc blwi_ld; struct ldlm_lock *blwi_lock; + int blwi_flags; }; #ifdef __KERNEL__ @@ -532,7 +533,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, instant_cancel = 1; body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_desc = *desc; body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS); @@ -560,9 +561,15 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, rc = ptlrpc_queue_wait(req); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); } - if (rc != 0) + if (rc != 0) { + /* If client canceled the lock but the cancel has not been + * recieved yet, we need to update lvbo to have the proper + * attributes cached. */ + if (rc == -EINVAL) + ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1); rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); - + } + ptlrpc_req_finished(req); /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */ @@ -607,7 +614,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_flags = flags; ldlm_lock2desc(lock, &body->lock_desc); @@ -689,7 +696,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); lock_res_and_lock(lock); @@ -711,8 +718,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) else if (rc != 0) rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); else - rc = res->lr_namespace->ns_lvbo->lvbo_update - (res, req->rq_repmsg, REPLY_REC_OFF, 1); + rc = ldlm_res_lvbo_update(res, req->rq_repmsg, + REPLY_REC_OFF, 1); ptlrpc_req_finished(req); RETURN(rc); } @@ -767,6 +774,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, GOTO(out, rc = -EFAULT); } + ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF); flags = dlm_req->lock_flags; LASSERT(req->rq_export); @@ -821,7 +829,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, if (flags & LDLM_FL_REPLAY) { lock = find_existing_lock(req->rq_export, - &dlm_req->lock_handle1); + &dlm_req->lock_handle[0]); if (lock != NULL) { DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64, lock->l_handle.h_cookie); @@ -830,7 +838,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } /* The lock's callback data might be set in the policy function */ - lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2, + lock = ldlm_lock_create(obddev->obd_namespace, dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, @@ -840,7 +848,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, GOTO(out, rc = -ENOMEM); do_gettimeofday(&lock->l_enqueued_time); - lock->l_remote_handle = dlm_req->lock_handle1; + lock->l_remote_handle = dlm_req->lock_handle[0]; LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); @@ -1034,7 +1042,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req) sizeof(*dlm_rep)); dlm_rep->lock_flags = dlm_req->lock_flags; - lock = ldlm_handle2lock(&dlm_req->lock_handle1); + lock = ldlm_handle2lock(&dlm_req->lock_handle[0]); if (!lock) { req->rq_status = EINVAL; } else { @@ -1065,11 +1073,62 @@ int ldlm_handle_convert(struct ptlrpc_request *req) RETURN(0); } +/* Cancel all the locks, which handles are packed into ldlm_request */ +int ldlm_request_cancel(struct ptlrpc_request *req, + struct ldlm_request *dlm_req, int first) +{ + struct ldlm_resource *res, *pres = NULL; + struct ldlm_lock *lock; + int i, count, done = 0; + ENTRY; + + LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, " + "starting at %d", dlm_req->lock_count, first); + count = dlm_req->lock_count ? dlm_req->lock_count : 1; + if (first >= count) + RETURN(0); + + /* There is no lock on the server at the replay time, + * skip lock cancelling to make replay tests to pass. */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); + + for (i = first; i < count; i++) { + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (!lock) { + LDLM_DEBUG_NOLOCK("server-side cancel handler stale " + "lock (cookie "LPU64")", + dlm_req->lock_handle[i].cookie); + continue; + } + + done++; + res = lock->l_resource; + if (res != pres) { + if (pres != NULL) { + ldlm_reprocess_all(pres); + ldlm_resource_putref(pres); + } + if (res != NULL) { + ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 0, 1); + } + pres = res; + } + ldlm_lock_cancel(lock); + LDLM_LOCK_PUT(lock); + } + if (pres != NULL) { + ldlm_reprocess_all(pres); + ldlm_resource_putref(pres); + } + LDLM_DEBUG_NOLOCK("server-side cancel handler END"); + RETURN(done); +} + int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock *lock; - struct ldlm_resource *res; int rc; ENTRY; @@ -1090,40 +1149,12 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) RETURN(-ENOMEM); } - lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) { - CERROR("received cancel for unknown lock cookie "LPX64 - " from client %s id %s\n", - dlm_req->lock_handle1.cookie, - req->rq_export->exp_client_uuid.uuid, - libcfs_id2str(req->rq_peer)); - LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock " - "(cookie "LPU64")", - dlm_req->lock_handle1.cookie); + if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; - } else { - LDLM_DEBUG(lock, "server-side cancel handler START"); - res = lock->l_resource; - if (res && res->lr_namespace->ns_lvbo && - res->lr_namespace->ns_lvbo->lvbo_update) { - (void)res->lr_namespace->ns_lvbo->lvbo_update - (res, NULL, 0, 1); - //(res, req->rq_reqmsg, 1, 1); - } - - ldlm_lock_cancel(lock); - req->rq_status = rc; - } - + if (ptlrpc_reply(req) != 0) LBUG(); - if (lock) { - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "server-side cancel handler END"); - LDLM_LOCK_PUT(lock); - } - RETURN(0); } @@ -1252,7 +1283,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, cfs_time_after(cfs_time_current(), cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { unlock_res_and_lock(lock); - if (ldlm_bl_to_thread(ns, NULL, lock)) + if (ldlm_bl_to_thread(ns, NULL, lock, 0)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; @@ -1275,7 +1306,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock) + struct ldlm_lock *lock, int flags) { #ifdef __KERNEL__ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; @@ -1290,6 +1321,7 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, if (ld != NULL) blwi->blwi_ld = *ld; blwi->blwi_lock = lock; + blwi->blwi_flags = flags; spin_lock(&blp->blp_lock); list_add_tail(&blwi->blwi_entry, &blp->blp_list); @@ -1330,7 +1362,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lustre_swab_ldlm_request); if (dlm_req != NULL) CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n", - dlm_req->lock_handle1.cookie); + dlm_req->lock_handle[0].cookie); ldlm_callback_reply(req, -ENOTCONN); RETURN(0); @@ -1402,10 +1434,10 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN (0); } - lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1); + lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]); if (!lock) { - CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n", - dlm_req->lock_handle1.cookie); + CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " + "disappeared\n", dlm_req->lock_handle[0].cookie); ldlm_callback_reply(req, -EINVAL); RETURN(0); } @@ -1413,6 +1445,22 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ lock_res_and_lock(lock); lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS); + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + /* If somebody cancels locks and cache is already droped, + * we can tell the server we have no lock. Otherwise, we + * should send cancel after dropping the cache. */ + if ((lock->l_flags & LDLM_FL_CANCELING) && + (lock->l_flags & LDLM_FL_BL_DONE)) { + LDLM_DEBUG(lock, "callback on lock " + LPX64" - lock disappeared\n", + dlm_req->lock_handle[0].cookie); + LDLM_LOCK_PUT(lock); + unlock_res_and_lock(lock); + ldlm_callback_reply(req, -EINVAL); + RETURN(0); + } + lock->l_flags |= LDLM_FL_BL_AST; + } unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond @@ -1429,7 +1477,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "blocking ast\n"); if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) ldlm_callback_reply(req, 0); - if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock)) + if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: @@ -1470,7 +1518,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) sizeof(*dlm_req), lustre_swab_ldlm_request); if (dlm_req != NULL) - ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1); + ldlm_lock_dump_handle(D_ERROR, + &dlm_req->lock_handle[0]); ldlm_callback_reply(req, -ENOTCONN); RETURN(0); @@ -1548,8 +1597,22 @@ static int ldlm_bl_thread_main(void *arg) if (blwi->blwi_ns == NULL) break; - ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, - blwi->blwi_lock); + if (blwi->blwi_flags == LDLM_FL_CANCELING) { + /* The special case when we cancel locks in lru + * asynchronously, then we first remove the lock from + * l_bl_ast explicitely in ldlm_cancel_lru before + * sending it to this thread. Thus lock is marked + * LDLM_FL_CANCELING, and already cancelled locally. */ + CFS_LIST_HEAD(head); + LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast)); + list_add(&blwi->blwi_lock->l_bl_ast, &head); + ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export, + &head, 1); + LDLM_LOCK_PUT(blwi->blwi_lock); + } else { + ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, + blwi->blwi_lock); + } OBD_FREE(blwi, sizeof(*blwi)); } @@ -1827,18 +1890,22 @@ EXPORT_SYMBOL(ldlm_completion_ast); EXPORT_SYMBOL(ldlm_blocking_ast); EXPORT_SYMBOL(ldlm_glimpse_ast); EXPORT_SYMBOL(ldlm_expired_completion_wait); +EXPORT_SYMBOL(ldlm_prep_enqueue_req); EXPORT_SYMBOL(ldlm_cli_convert); EXPORT_SYMBOL(ldlm_cli_enqueue); EXPORT_SYMBOL(ldlm_cli_enqueue_fini); EXPORT_SYMBOL(ldlm_cli_enqueue_local); EXPORT_SYMBOL(ldlm_cli_cancel); EXPORT_SYMBOL(ldlm_cli_cancel_unused); +EXPORT_SYMBOL(ldlm_cli_cancel_req); EXPORT_SYMBOL(ldlm_cli_join_lru); EXPORT_SYMBOL(ldlm_replay_locks); EXPORT_SYMBOL(ldlm_resource_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach_res); EXPORT_SYMBOL(ldlm_resource_iterate); +EXPORT_SYMBOL(ldlm_cancel_resource_local); +EXPORT_SYMBOL(ldlm_cli_cancel_list); /* ldlm_lockd.c */ EXPORT_SYMBOL(ldlm_server_blocking_ast); @@ -1846,6 +1913,7 @@ EXPORT_SYMBOL(ldlm_server_completion_ast); EXPORT_SYMBOL(ldlm_server_glimpse_ast); EXPORT_SYMBOL(ldlm_handle_enqueue); EXPORT_SYMBOL(ldlm_handle_cancel); +EXPORT_SYMBOL(ldlm_request_cancel); EXPORT_SYMBOL(ldlm_handle_convert); EXPORT_SYMBOL(ldlm_del_waiting_lock); EXPORT_SYMBOL(ldlm_get_ref); diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index b28d89e..71778cd 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -87,12 +87,10 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent - * - the caller has already initialized req->lr_tmp * - must call this function with the resource lock held * * If first_enq is 1 (ie, called from ldlm_lock_enqueue): * - blocking ASTs have not been sent - * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the resource lock held */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err, struct list_head *work_list) diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a24ce4f..ba7f180 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -242,7 +242,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id, LBUG(); } - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + lock = ldlm_lock_create(ns, res_id, type, mode, blocking, completion, glimpse, data, lvb_len); if (!lock) GOTO(out_nolock, err = -ENOMEM); @@ -454,6 +454,69 @@ cleanup: return rc; } +/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into + * a single page on the send/receive side. XXX: 512 should be changed + * to more adequate value. */ +#define ldlm_req_handles_avail(exp, size, bufcount, off) \ +({ \ + int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \ + int _s = size[DLM_LOCKREQ_OFF]; \ + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \ + _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \ + bufcount, size); \ + _avail /= sizeof(struct lustre_handle); \ + _avail += LDLM_LOCKREQ_HANDLES - off; \ + size[DLM_LOCKREQ_OFF] = _s; \ + _avail; \ +}) + +/* Cancel lru locks and pack them into the enqueue request. Pack there the given + * @count locks in @cancels. */ +struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, + int bufcount, int *size, + struct list_head *cancels, + int count) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_request *dlm = NULL; + struct ptlrpc_request *req; + CFS_LIST_HEAD(head); + ENTRY; + + if (cancels == NULL) + cancels = &head; + if (exp_connect_cancelset(exp)) { + /* Estimate the amount of free space in the request. */ + int avail = ldlm_req_handles_avail(exp, size, bufcount, + LDLM_ENQUEUE_CANCEL_OFF); + LASSERT(avail >= count); + + /* Cancel lru locks here _only_ if the server supports + * EARLY_CANCEL. Otherwise we have to send extra CANCEL + * rpc right on enqueue, what will make it slower, vs. + * asynchronous rpc in blocking thread. */ + count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count, + LDLM_CANCEL_AGED); + size[DLM_LOCKREQ_OFF] = + ldlm_request_bufsize(count, LDLM_ENQUEUE); + } + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, + LDLM_ENQUEUE, bufcount, size, NULL); + if (exp_connect_cancelset(exp) && req) { + dlm = lustre_msg_buf(req->rq_reqmsg, + DLM_LOCKREQ_OFF, sizeof(*dlm)); + /* Skip first lock handler in ldlm_request_pack(), this method + * will incrment @lock_count according to the lock handle amount + * actually written to the buffer. */ + dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF; + } + if (req) + ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF); + else + ldlm_lock_list_put(cancels, l_bl_ast, count); + RETURN(req); +} + /* If a request has some specific initialisation it is passed in @reqp, * otherwise it is created in ldlm_cli_enqueue. * @@ -461,12 +524,9 @@ cleanup: * request was created in ldlm_cli_enqueue and it is the async request, * pass it to the caller in @reqp. */ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, - struct ldlm_res_id res_id, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, void *lvb, __u32 lvb_len, void *lvb_swabber, + struct ldlm_enqueue_info *einfo, struct ldlm_res_id res_id, + ldlm_policy_data_t *policy, int *flags, + void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; @@ -491,12 +551,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, - completion, glimpse, data, lvb_len); + lock = ldlm_lock_create(ns, res_id, einfo->ei_type, + einfo->ei_mode, einfo->ei_cb_bl, + einfo->ei_cb_cp, einfo->ei_cb_gl, + einfo->ei_cbdata, lvb_len); if (lock == NULL) RETURN(-ENOMEM); /* for the local lock, add the reference */ - ldlm_lock_addref_internal(lock, mode); + ldlm_lock_addref_internal(lock, einfo->ei_mode); ldlm_lock2handle(lock, lockh); lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) { @@ -505,8 +567,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, * descriptor (ldlm_lock2desc() below) but use an * inodebits lock internally with both bits set. */ - if (type == LDLM_IBITS && !(exp->exp_connect_flags & - OBD_CONNECT_IBITS)) + if (einfo->ei_type == LDLM_IBITS && + !(exp->exp_connect_flags & OBD_CONNECT_IBITS)) lock->l_policy_data.l_inodebits.bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; @@ -514,7 +576,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_policy_data = *policy; } - if (type == LDLM_EXTENT) + if (einfo->ei_type == LDLM_EXTENT) lock->l_req_extent = policy->l_extent; LDLM_DEBUG(lock, "client-side enqueue START"); } @@ -522,10 +584,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, /* lock not sent to server yet */ if (reqp == NULL || *reqp == NULL) { - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) { - failed_lock_cleanup(ns, lock, lockh, mode); + failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode); LDLM_LOCK_PUT(lock); RETURN(-ENOMEM); } @@ -534,7 +595,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, *reqp = req; } else { req = *reqp; - LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) == + LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >= sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n", DLM_LOCKREQ_OFF, lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF), @@ -543,13 +604,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_conn_export = exp; lock->l_export = NULL; - lock->l_blocking_ast = blocking; + lock->l_blocking_ast = einfo->ei_cb_bl; /* Dump lock data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = *flags; - body->lock_handle1 = *lockh; + body->lock_handle[0] = *lockh; /* Continue as normal. */ if (!req_passed_in) { @@ -562,7 +623,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where * [i_size, OBD_OBJECT_EOF] lock is taken. */ - LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT || + LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT || policy->l_extent.end == OBD_OBJECT_EOF)); if (async) { @@ -572,9 +633,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "sending request"); rc = ptlrpc_queue_wait(req); - rc = ldlm_cli_enqueue_fini(exp, req, type, policy ? 1 : 0, - mode, flags, lvb, lvb_len, lvb_swabber, - lockh, rc); + rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, + einfo->ei_mode, flags, lvb, lvb_len, + lvb_swabber, lockh, rc); if (!req_passed_in && req != NULL) { ptlrpc_req_finished(req); @@ -643,7 +704,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) GOTO(out, rc = -ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + body->lock_handle[0] = lock->l_remote_handle; body->lock_desc.l_req_mode = new_mode; body->lock_flags = *flags; @@ -686,24 +747,18 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) return rc; } -int ldlm_cli_cancel(struct lustre_handle *lockh) +/* Cancel locks locally. + * Returns: + * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server; + * LDLM_FL_CANCELING otherwise; + * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */ +static int ldlm_cli_cancel_local(struct ldlm_lock *lock) { - struct ptlrpc_request *req; - struct ldlm_lock *lock; - struct ldlm_request *body; - int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(*body) }; - int rc = 0; + int rc = LDLM_FL_LOCAL_ONLY; ENTRY; - - /* concurrent cancels on the same handle can happen */ - lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); - if (lock == NULL) - RETURN(0); - + if (lock->l_conn_export) { int local_only; - struct obd_import *imp; LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ @@ -712,26 +767,104 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) local_only = (lock->l_flags & (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); + rc = (lock->l_flags & LDLM_FL_BL_AST) ? + LDLM_FL_BL_AST : LDLM_FL_CANCELING; unlock_res_and_lock(lock); if (local_only) { - CDEBUG(D_INFO, "not sending request (at caller's " + CDEBUG(D_DLMTRACE, "not sending request (at caller's " "instruction)\n"); - goto local_cancel; + rc = LDLM_FL_LOCAL_ONLY; } + ldlm_lock_cancel(lock); + } else { + if (lock->l_resource->lr_namespace->ns_client) { + LDLM_ERROR(lock, "Trying to cancel local lock"); + LBUG(); + } + LDLM_DEBUG(lock, "server-side local cancel"); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "server-side local cancel handler END"); + } + + RETURN(rc); +} + +/* Pack @count locks in @head into ldlm_request buffer at the offset @off, + of the request @req. */ +static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, + struct list_head *head, int count) +{ + struct ldlm_request *dlm; + struct ldlm_lock *lock; + int max, packed = 0; + ENTRY; + + dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm)); + LASSERT(dlm != NULL); + + /* Check the room in the request buffer. */ + max = lustre_msg_buflen(req->rq_reqmsg, off) - + sizeof(struct ldlm_request); + max /= sizeof(struct lustre_handle); + max += LDLM_LOCKREQ_HANDLES; + LASSERT(max >= dlm->lock_count + count); + + /* XXX: it would be better to pack lock handles grouped by resource. + * so that the server cancel would call filter_lvbo_update() less + * frequently. */ + list_for_each_entry(lock, head, l_bl_ast) { + if (!count--) + break; + LASSERT(lock->l_conn_export); + /* Pack the lock handle to the given request buffer. */ + LDLM_DEBUG(lock, "packing"); + dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; + packed++; + } + CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); + EXIT; +} + +/* Prepare and send a batched cancel rpc, it will include count lock handles + * of locks given in @head. */ +int ldlm_cli_cancel_req(struct obd_export *exp, + struct list_head *cancels, int count) +{ + struct ptlrpc_request *req = NULL; + struct ldlm_request *body; + int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREQ_OFF] = sizeof(*body) }; + struct obd_import *imp; + int free, sent = 0; + int rc = 0; + ENTRY; + + LASSERT(exp != NULL); + LASSERT(count > 0); - restart: - imp = class_exp2cliimp(lock->l_conn_export); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) + RETURN(count); + + free = ldlm_req_handles_avail(exp, size, 2, 0); + if (count > free) + count = free; + + size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL); + while (1) { + imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) { CDEBUG(D_HA, "skipping cancel on invalid import %p\n", imp); - goto local_cancel; + break; } req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + req->rq_no_resend = 1; /* XXX FIXME bug 249 */ @@ -740,85 +873,119 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); - body->lock_handle1 = lock->l_remote_handle; + ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count); ptlrpc_req_set_repsize(req, 1, NULL); rc = ptlrpc_queue_wait(req); if (rc == ESTALE) { - CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync " - "-- not fatal, flags %x\n", + CDEBUG(D_DLMTRACE, "client/server (nid %s) " + "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> - imp_connection->c_peer.nid), - lock->l_flags); + imp_connection->c_peer.nid)); } else if (rc == -ETIMEDOUT) { ptlrpc_req_finished(req); - GOTO(restart, rc); + continue; } else if (rc != ELDLM_OK) { CERROR("Got rc %d from cancel RPC: canceling " "anyway\n", rc); + break; } + sent = count; + break; + } + + ptlrpc_req_finished(req); + EXIT; +out: + return sent ? sent : rc; +} - ptlrpc_req_finished(req); - local_cancel: - ldlm_lock_cancel(lock); - } else { - if (lock->l_resource->lr_namespace->ns_client) { - LDLM_ERROR(lock, "Trying to cancel local lock"); - LBUG(); - } - LDLM_DEBUG(lock, "client-side local cancel"); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "client-side local cancel handler END"); +int ldlm_cli_cancel(struct lustre_handle *lockh) +{ + struct ldlm_lock *lock; + CFS_LIST_HEAD(head); + int rc = 0; + ENTRY; + + /* concurrent cancels on the same handle can happen */ + lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); + if (lock == NULL) { + LDLM_DEBUG_NOLOCK("lock is already being destroyed\n"); + RETURN(0); } + + rc = ldlm_cli_cancel_local(lock); + if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) + GOTO(out, rc); + list_add(&lock->l_bl_ast, &head); + rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1); EXIT; out: LDLM_LOCK_PUT(lock); - return rc; + return rc < 0 ? rc : 0; } - -/* when called with LDLM_ASYNC the blocking callback will be handled - * in a thread and this function will return after the thread has been - * asked to call the callback. when called with LDLM_SYNC the blocking - * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) +/* - Free space in lru for @count new locks, + * redundant unused locks are canceled locally; + * - also cancel locally unused aged locks; + * - do not cancel more than @max locks; + * - GET the found locks and add them into the @cancels list. + * + * A client lock can be added to the l_bl_ast list only when it is + * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL. + * There are the following use cases: ldlm_cancel_resource_local(), + * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this + * flag properly. As any attempt to cancel a lock rely on this flag, + * l_bl_ast list is accessed later without any special locking. */ +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, + int count, int max, int flags) { + cfs_time_t cur = cfs_time_current(); struct ldlm_lock *lock, *next; - int count, rc = 0; - CFS_LIST_HEAD(cblist); + int rc, added = 0, left; ENTRY; -#ifndef __KERNEL__ - sync = LDLM_SYNC; /* force to be sync in user space */ -#endif - spin_lock(&ns->ns_unused_lock); - count = ns->ns_nr_unused - ns->ns_max_unused; - - if (count <= 0) { - spin_unlock(&ns->ns_unused_lock); - RETURN(0); - } - + count += ns->ns_nr_unused - ns->ns_max_unused; while (!list_empty(&ns->ns_unused_list)) { - struct list_head *tmp = ns->ns_unused_list.next; - lock = list_entry(tmp, struct ldlm_lock, l_lru); - LASSERT(!lock->l_readers && !lock->l_writers); + if (max && added >= max) + break; + list_for_each_entry(lock, &ns->ns_unused_list, l_lru) { + /* somebody is already doing CANCEL or there is a + * blocking request will send cancel. */ + if (!(lock->l_flags & LDLM_FL_CANCELING) && + !(lock->l_flags & LDLM_FL_BL_AST)) + break; + } + if (&lock->l_lru == &ns->ns_unused_list) + break; + + if ((added >= count) && + (!(flags & LDLM_CANCEL_AGED) || + cfs_time_before_64(cur, (__u64)ns->ns_max_age + + lock->l_last_used))) + break; + LDLM_LOCK_GET(lock); /* dropped by bl thread */ spin_unlock(&ns->ns_unused_lock); lock_res_and_lock(lock); - if (ldlm_lock_remove_from_lru(lock) == 0) { - /* other thread is removing lock from lru */ + /* Check flags again under the lock. */ + if ((lock->l_flags & LDLM_FL_CANCELING) || + (lock->l_flags & LDLM_FL_BL_AST) || + (ldlm_lock_remove_from_lru(lock) == 0)) { + /* other thread is removing lock from lru or + * somebody is already doing CANCEL or + * there is a blocking request which will send + * cancel by itseft. */ unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); spin_lock(&ns->ns_unused_lock); continue; - } + LASSERT(!lock->l_readers && !lock->l_writers); /* If we have chosen to canecl this lock voluntarily, we better send cancel notification to server, so that it frees @@ -832,59 +999,96 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) * the lock can accumulate no more readers/writers. Since * readers and writers are already zero here, ldlm_lock_decref * won't see this flag and call l_blocking_ast */ - lock->l_flags |= LDLM_FL_CBPENDING; - + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; /* We can't re-add to l_lru as it confuses the refcounting in * ldlm_lock_remove_from_lru() if an AST arrives after we drop - * ns_lock below. We use l_tmp and can't use l_pending_chain as - * it is used both on server and client nevertheles bug 5666 + * ns_lock below. We use l_bl_ast and can't use l_pending_chain + * as it is used both on server and client nevertheles bug 5666 * says it is used only on server. --umka */ - list_add(&lock->l_tmp, &cblist); - unlock_res_and_lock(lock); - - LDLM_LOCK_GET(lock); /* to hold lock after bl thread */ - if (sync == LDLM_ASYNC && (ldlm_bl_to_thread(ns, NULL, lock) == 0)) { - lock_res_and_lock(lock); - list_del_init(&lock->l_tmp); - unlock_res_and_lock(lock); - } - LDLM_LOCK_PUT(lock); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, cancels); + unlock_res_and_lock(lock); spin_lock(&ns->ns_unused_lock); - - if (--count == 0) - break; + added++; } spin_unlock(&ns->ns_unused_lock); - list_for_each_entry_safe(lock, next, &cblist, l_tmp) { - list_del_init(&lock->l_tmp); - ldlm_handle_bl_callback(ns, NULL, lock); - } + /* Handle only @added inserted locks. */ + left = added; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + if (left-- == 0) + break; - RETURN(rc); + rc = ldlm_cli_cancel_local(lock); + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + added--; + } + + } + RETURN(added); } -static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, - struct ldlm_res_id res_id, int flags, - void *opaque) +/* when called with LDLM_ASYNC the blocking callback will be handled + * in a thread and this function will return after the thread has been + * asked to call the callback. when called with LDLM_SYNC the blocking + * callback will be performed in this function. */ +int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) { - struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list); - struct ldlm_resource *res; - struct ldlm_lock *lock; + CFS_LIST_HEAD(cancels); + int count, rc; ENTRY; - res = ldlm_resource_get(ns, NULL, res_id, 0, 0); - if (res == NULL) { - /* This is not a problem. */ - CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]); - RETURN(0); +#ifndef __KERNEL__ + sync = LDLM_SYNC; /* force to be sync in user space */ +#endif + count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0); + if (sync == LDLM_ASYNC) { + struct ldlm_lock *lock, *next; + list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { + /* Remove from the list to allow blocking thread to + * re-use l_bl_ast. */ + list_del_init(&lock->l_bl_ast); + rc = ldlm_bl_to_thread(ns, NULL, lock, + LDLM_FL_CANCELING); + if (rc) + list_add_tail(&lock->l_bl_ast, &next->l_bl_ast); + } } - lock_res(res); - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + /* If some locks are left in the list in ASYNC mode, or + * this is SYNC mode, cancel the list. */ + ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); + RETURN(0); +} +/* Find and cancel locally unused locks found on resource, matched to the + * given policy, mode. GET the found locks and add them into the @cancels + * list. */ +int ldlm_cancel_resource_local(struct ldlm_resource *res, + struct list_head *cancels, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int lock_flags, + int flags, void *opaque) +{ + struct ldlm_lock *lock, *next; + int count = 0, left; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { if (opaque != NULL && lock->l_ast_data != opaque) { LDLM_ERROR(lock, "data %p doesn't match opaque %p", lock->l_ast_data, opaque); @@ -900,34 +1104,133 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, continue; } + /* If somebody is already doing CANCEL, or blocking ast came, + * skip this lock. */ + if (lock->l_flags & LDLM_FL_BL_AST || + lock->l_flags & LDLM_FL_CANCELING) + continue; + + if (lockmode_compat(lock->l_granted_mode, mode)) + continue; + + /* If policy is given and this is IBITS lock, add to list only + * those locks that match by policy. */ + if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && + !(lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits)) + continue; + /* See CBPENDING comment in ldlm_cancel_lru */ - lock->l_flags |= LDLM_FL_CBPENDING; + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | + lock_flags; LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, &list); + list_add(&lock->l_bl_ast, cancels); LDLM_LOCK_GET(lock); + count++; } unlock_res(res); - list_for_each_safe(tmp, next, &list) { - struct lustre_handle lockh; - int rc; - lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); + /* Handle only @count inserted locks. */ + left = count; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + int rc = LDLM_FL_LOCAL_ONLY; - if (flags & LDLM_FL_LOCAL_ONLY) { + if (left-- == 0) + break; + if (flags & LDLM_FL_LOCAL_ONLY) ldlm_lock_cancel(lock); + else + rc = ldlm_cli_cancel_local(lock); + + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + count--; + } + } + RETURN(count); +} + +/* If @req is NULL, send CANCEL request to server with handles of locks + * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests + * separately per lock. + * If @req is not NULL, put handles of locks in @cancels into the request + * buffer at the offset @off. + * Destroy @cancels at the end. */ +int ldlm_cli_cancel_list(struct list_head *cancels, int count, + struct ptlrpc_request *req, int off) +{ + struct ldlm_lock *lock; + int res = 0; + ENTRY; + + if (list_empty(cancels) || count == 0) + RETURN(0); + + while (count) { + LASSERT(!list_empty(cancels)); + lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast); + LASSERT(lock->l_conn_export); + + if (exp_connect_cancelset(lock->l_conn_export)) { + res = count; + if (req) + ldlm_cancel_pack(req, off, cancels, count); + else + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, count); } else { - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel: %d\n", rc); + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, 1); + } + + if (res < 0) { + CERROR("ldlm_cli_cancel_list: %d\n", res); + res = count; } - list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock); + + count -= res; + ldlm_lock_list_put(cancels, l_bl_ast, res); } + LASSERT(list_empty(cancels)); + RETURN(0); +} - ldlm_resource_putref(res); +static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, + struct ldlm_res_id res_id, + int flags, void *opaque) +{ + struct ldlm_resource *res; + CFS_LIST_HEAD(cancels); + int count; + int rc; + ENTRY; + + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + if (res == NULL) { + /* This is not a problem. */ + CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]); + RETURN(0); + } + count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE, + 0, flags, opaque); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); + + ldlm_resource_putref(res); RETURN(0); } @@ -947,8 +1250,7 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns) * that have 0 readers/writers. * * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying - * to notify the server. - * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */ + * to notify the server. */ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, int flags, void *opaque) { @@ -978,7 +1280,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, flags, opaque); if (rc) - CERROR("cancel_unused_res ("LPU64"): %d\n", + CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n", res->lr_name.name[0], rc); spin_lock(&ns->ns_hash_lock); @@ -1016,6 +1318,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns, !lock->l_readers && !lock->l_writers && !(lock->l_flags & LDLM_FL_LOCAL) && !(lock->l_flags & LDLM_FL_CBPENDING)) { + lock->l_last_used = cfs_time_current(); spin_lock(&ns->ns_unused_lock); LASSERT(ns->ns_nr_unused >= 0); list_add_tail(&lock->l_lru, &ns->ns_unused_list); @@ -1261,7 +1564,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = flags; - ldlm_lock2handle(lock, &body->lock_handle1); + ldlm_lock2handle(lock, &body->lock_handle[0]); size[DLM_LOCKREPLY_OFF] = sizeof(*reply); if (lock->l_lvb_len != 0) { buffers = 3; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 8f5425e..0d489ce 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -118,6 +118,26 @@ static int lprocfs_uint_rd(char *page, char **start, off_t off, return snprintf(page, count, "%u\n", *temp); } +#define MAX_STRING_SIZE 128 +static int lprocfs_uint_wr(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + unsigned *p = data; + char dummy[MAX_STRING_SIZE + 1], *end; + unsigned long tmp; + + dummy[MAX_STRING_SIZE] = '\0'; + if (copy_from_user(dummy, buffer, MAX_STRING_SIZE)) + return -EFAULT; + + tmp = simple_strtoul(dummy, &end, 0); + if (dummy == end) + return -EINVAL; + + *p = (unsigned int)tmp; + return count; +} + static int lprocfs_read_lru_size(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -126,7 +146,6 @@ static int lprocfs_read_lru_size(char *page, char **start, off_t off, &ns->ns_max_unused); } -#define MAX_STRING_SIZE 128 static int lprocfs_write_lru_size(struct file *file, const char *buffer, unsigned long count, void *data) { @@ -200,6 +219,13 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) lock_vars[0].read_fptr = lprocfs_read_lru_size; lock_vars[0].write_fptr = lprocfs_write_lru_size; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age", + ns->ns_name); + lock_vars[0].data = &ns->ns_max_age; + lock_vars[0].read_fptr = lprocfs_uint_rd; + lock_vars[0].write_fptr = lprocfs_uint_wr; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } } #undef MAX_STRING_SIZE @@ -249,6 +275,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) CFS_INIT_LIST_HEAD(&ns->ns_unused_list); ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; + ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; spin_lock_init(&ns->ns_unused_lock); mutex_down(&ldlm_namespace_lock); @@ -311,13 +338,15 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, lock->l_flags |= LDLM_FL_FAILED; lock->l_flags |= flags; + /* ... without sending a CANCEL message for local_only. */ + if (local_only) + lock->l_flags |= LDLM_FL_LOCAL_ONLY; + if (local_only && (lock->l_readers || lock->l_writers)) { /* This is a little bit gross, but much better than the * alternative: pretend that we got a blocking AST from * the server, so that when the lock is decref'd, it * will go away ... */ - /* ... without sending a CANCEL message. */ - lock->l_flags |= LDLM_FL_LOCAL_ONLY; unlock_res(res); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); if (lock->l_completion_ast) @@ -331,14 +360,9 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, unlock_res(res); ldlm_lock2handle(lock, &lockh); - if (!local_only) { - rc = ldlm_cli_cancel(&lockh); - if (rc) - CERROR("ldlm_cli_cancel: %d\n", rc); - } - /* Force local cleanup on errors, too. */ - if (local_only || rc != ELDLM_OK) - ldlm_lock_cancel(lock); + rc = ldlm_cli_cancel(&lockh); + if (rc) + CERROR("ldlm_cli_cancel: %d\n", rc); } else { ldlm_resource_unlink_lock(lock); unlock_res(res); diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 87b7536..50a94e2 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -84,12 +84,14 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { + struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR, + llu_mdc_blocking_ast, ldlm_completion_ast, NULL, inode}; + llu_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &it, LCK_CR, + rc = mdc_enqueue(sbi->ll_mdc_exp, &einfo, &it, &data, &lockh, NULL, 0, - ldlm_completion_ast, llu_mdc_blocking_ast, - inode, LDLM_FL_CANCEL_ON_BLOCK); + LDLM_FL_CANCEL_ON_BLOCK); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) ptlrpc_req_finished(request); diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index 4069827..7d43699 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -97,6 +97,7 @@ void llu_prepare_mdc_op_data(struct mdc_op_data *data, data->namelen = namelen; data->create_mode = mode; data->mod_time = CURRENT_TIME; + data->data = NULL; } void obdo_refresh_inode(struct inode *dst, diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index eb22812..80457fc 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -226,7 +226,7 @@ int llu_glimpse_size(struct inode *inode) struct intnl_stat *st = llu_i2stat(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -242,7 +242,6 @@ int llu_glimpse_size(struct inode *inode) einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = llu_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; @@ -251,6 +250,7 @@ int llu_glimpse_size(struct inode *inode) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lli->lli_smd; + oinfo.oi_flags = LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo); if (rc) { @@ -279,7 +279,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, { struct llu_sb_info *sbi = llu_i2sbi(inode); struct intnl_stat *st = llu_i2stat(inode); - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -299,7 +299,6 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_flags = ast_flags; einfo.ei_cb_bl = llu_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; @@ -308,8 +307,9 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, oinfo.oi_policy = *policy; oinfo.oi_lockh = lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = ast_flags; - rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo); + rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL); *policy = oinfo.oi_policy; if (rc > 0) rc = -EIO; diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 79a0fcb..6267ce5 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1274,11 +1274,11 @@ static int llu_file_flock(struct inode *ino, struct llu_inode_info *lli = llu_i2info(ino); struct intnl_stat *st = llu_i2stat(ino); struct ldlm_res_id res_id = - { .name = {st->st_ino, - lli->lli_st_generation, LDLM_FLOCK} }; + { .name = {st->st_ino, lli->lli_st_generation, LDLM_FLOCK} }; + struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, + ldlm_flock_completion_ast, NULL, file_lock }; struct lustre_handle lockh = {0}; ldlm_policy_data_t flock; - ldlm_mode_t mode = 0; int flags = 0; int rc; @@ -1291,13 +1291,13 @@ static int llu_file_flock(struct inode *ino, switch (file_lock->fl_type) { case F_RDLCK: - mode = LCK_PR; + einfo.ei_mode = LCK_PR; break; case F_UNLCK: - mode = LCK_NL; + einfo.ei_mode = LCK_NL; break; case F_WRLCK: - mode = LCK_PW; + einfo.ei_mode = LCK_PW; break; default: CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); @@ -1328,7 +1328,7 @@ static int llu_file_flock(struct inode *ino, #endif #endif flags = LDLM_FL_TEST_LOCK; - file_lock->fl_type = mode; + file_lock->fl_type = einfo.ei_mode; break; default: CERROR("unknown fcntl cmd: %d\n", cmd); @@ -1338,12 +1338,11 @@ static int llu_file_flock(struct inode *ino, CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", (unsigned long long) st->st_ino, flock.l_flock.pid, - flags, mode, flock.l_flock.start, flock.l_flock.end); + flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); + + rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, res_id, + &flock, &flags, NULL, 0, NULL, &lockh, 0); - rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, res_id, - LDLM_FLOCK, &flock, mode, &flags, NULL, - ldlm_flock_completion_ast, NULL, - file_lock, NULL, 0, NULL, &lockh, 0); RETURN(rc); } @@ -1630,6 +1629,9 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, struct llu_inode_info *lli2 = NULL; struct lov_stripe_md *lsm; struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags}; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR, + llu_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL }; + struct ptlrpc_request *req = NULL; struct lustre_md md; struct mdc_op_data data; @@ -1658,9 +1660,8 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, llu_prepare_mdc_op_data(&data, NULL, ino, NULL, 0, O_RDWR); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &oit, LCK_CR, &data, - &lockh, lum, lum_size, ldlm_completion_ast, - llu_mdc_blocking_ast, NULL, LDLM_FL_INTENT_ONLY); + rc = mdc_enqueue(sbi->ll_mdc_exp, &einfo, &oit, &data, + &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY); if (rc) GOTO(out, rc); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 2a1a78a..1e422ca 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -380,7 +380,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, LASSERT(it); ll_prepare_mdc_op_data(&op_data, de->d_parent->d_inode, de->d_inode, - de->d_name.name, de->d_name.len, 0); + de->d_name.name, de->d_name.len, 0, NULL); if ((it->it_op == IT_OPEN) && de->d_inode) { struct inode *inode = de->d_inode; @@ -517,7 +517,7 @@ do_lookup: } /*do real lookup here */ ll_prepare_mdc_op_data(&op_data, de->d_parent->d_inode, NULL, - de->d_name.name, de->d_name.len, 0); + de->d_name.name, de->d_name.len, 0, NULL); rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, 0, &req, ll_mdc_blocking_ast, 0); if (rc >= 0) { diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 34d2c7d..8862b5c 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -220,15 +220,15 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { struct lookup_intent it = { .it_op = IT_READDIR }; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR, + ll_mdc_blocking_ast, ldlm_completion_ast, NULL, dir }; struct ptlrpc_request *request; struct mdc_op_data data; - ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0); + ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0, NULL); - rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_IBITS, &it, - LCK_CR, &data, &lockh, NULL, 0, - ldlm_completion_ast, ll_mdc_blocking_ast, dir, - 0); + rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, &einfo, &it, + &data, &lockh, NULL, 0, 0); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) @@ -402,7 +402,7 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump) if (lump->lmm_magic != cpu_to_le32(LOV_USER_MAGIC)) lustre_swab_lov_user_md(lump); - ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); + ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0, NULL); /* swabbing is done in lov_setstripe() on server side */ rc = mdc_setattr(sbi->ll_mdc_exp, &data, diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1bf51df..76b5753 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -266,7 +266,8 @@ static int ll_intent_file_open(struct file *file, void *lmm, if (!parent) RETURN(-ENOENT); - ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR); + ll_prepare_mdc_op_data(&data, parent->d_inode, inode, + name, len, O_RDWR, NULL); /* Usually we come here only for NFSD, and we want open lock. But we can also get here with pre 2.6.15 patchless kernels, and in @@ -943,7 +944,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, lstat_t *st) { struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -952,7 +953,6 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -961,6 +961,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo); if (rc == -ENOENT) @@ -991,7 +992,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -1013,7 +1014,6 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) * acquired only if there were no conflicting locks. */ einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -1022,6 +1022,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lli->lli_smd; + oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo); if (rc == -ENOENT) @@ -1054,7 +1055,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, { struct ll_sb_info *sbi = ll_i2sbi(inode); struct ost_lvb lvb; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; @@ -1076,7 +1077,6 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_flags = ast_flags; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -1085,8 +1085,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, oinfo.oi_policy = *policy; oinfo.oi_lockh = lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = ast_flags; - rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo); + rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL); *policy = oinfo.oi_policy; if (rc > 0) rc = -EIO; @@ -1872,10 +1873,11 @@ static int join_file(struct inode *head_inode, struct file *head_filp, struct dentry *tail_dentry = tail_filp->f_dentry; struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = head_filp->f_flags|O_JOIN_FILE}; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW, + ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL }; + struct lustre_handle lockh; struct mdc_op_data *op_data; - __u32 hsize = head_inode->i_size >> 32; - __u32 tsize = head_inode->i_size; int rc; ENTRY; @@ -1890,10 +1892,9 @@ static int join_file(struct inode *head_inode, struct file *head_filp, ll_prepare_mdc_op_data(op_data, head_inode, tail_parent, tail_dentry->d_name.name, - tail_dentry->d_name.len, 0); - rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW, - op_data, &lockh, &tsize, 0, ldlm_completion_ast, - ll_mdc_blocking_ast, &hsize, 0); + tail_dentry->d_name.len, 0, &head_inode->i_size); + rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit, + op_data, &lockh, NULL, 0, 0); if (rc < 0) GOTO(out, rc); @@ -2244,9 +2245,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_res_id res_id = { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} }; + struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, + ldlm_flock_completion_ast, NULL, file_lock }; struct lustre_handle lockh = {0}; ldlm_policy_data_t flock; - ldlm_mode_t mode = 0; int flags = 0; int rc; ENTRY; @@ -2267,7 +2269,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) switch (file_lock->fl_type) { case F_RDLCK: - mode = LCK_PR; + einfo.ei_mode = LCK_PR; break; case F_UNLCK: /* An unlock request may or may not have any relation to @@ -2278,10 +2280,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) * information that is given with a normal read or write record * lock request. To avoid creating another ldlm unlock (cancel) * message we'll treat a LCK_NL flock request as an unlock. */ - mode = LCK_NL; + einfo.ei_mode = LCK_NL; break; case F_WRLCK: - mode = LCK_PW; + einfo.ei_mode = LCK_PW; break; default: CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); @@ -2308,7 +2310,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) flags = LDLM_FL_TEST_LOCK; /* Save the old mode so that if the mode in the lock changes we * can decrement the appropriate reader or writer refcount. */ - file_lock->fl_type = mode; + file_lock->fl_type = einfo.ei_mode; break; default: CERROR("unknown fcntl lock command: %d\n", cmd); @@ -2317,12 +2319,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid, - flags, mode, flock.l_flock.start, flock.l_flock.end); + flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id, - LDLM_FLOCK, &flock, mode, &flags, NULL, - ldlm_flock_completion_ast, NULL, file_lock, - NULL, 0, NULL, &lockh, 0); + rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id, + &flock, &flags, NULL, 0, NULL, &lockh, 0); if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0)) ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW)); #ifdef HAVE_F_OP_FLOCK @@ -2415,7 +2415,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) /* Call getattr by fid, so do not provide name at all. */ ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode, - dentry->d_inode, NULL, 0, 0); + dentry->d_inode, NULL, 0, 0, NULL); rc = mdc_intent_lock(exp, &op_data, NULL, 0, /* we are not interested in name based lookup */ diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 71b8115..604e594 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -441,7 +441,7 @@ int ll_mdc_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); int ll_prepare_mdc_op_data(struct mdc_op_data *, struct inode *i1, struct inode *i2, - const char *name, int namelen, int mode); + const char *name, int namelen, int mode, void *data); #ifndef LUSTRE_KERNEL_VERSION struct lookup_intent *ll_convert_intent(struct open_intent *oit, int lookup_flags); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 06e4843..81054d1 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -154,7 +154,8 @@ static int client_common_fill_super(struct super_block *sb, /* indicate the features supported by this client */ data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | OBD_CONNECT_JOIN | - OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION; + OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION | + OBD_CONNECT_CANCELSET; #ifdef CONFIG_FS_POSIX_ACL data->ocd_connect_flags |= OBD_CONNECT_ACL; #endif @@ -255,7 +256,8 @@ static int client_common_fill_super(struct super_block *sb, } data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | - OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE; + OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_CANCELSET; CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d " "ocd_grant: %d\n", data->ocd_connect_flags, @@ -1257,7 +1259,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* We always do an MDS RPC, even if we're only changing the size; * only the MDS knows whether truncate() should fail with -ETXTBUSY */ - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); + ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0, NULL); rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, attr, NULL, 0, NULL, 0, &request); @@ -1746,7 +1748,7 @@ int ll_iocontrol(struct inode *inode, struct file *file, if (!oinfo.oi_oa) RETURN(-ENOMEM); - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); + ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0, NULL); memset(&attr, 0, sizeof(attr)); attr.ia_attr_flags = flags; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 058009c..29b3a64 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -200,6 +200,17 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, if (inode == NULL) break; + LASSERT(lock->l_flags & LDLM_FL_CANCELING); + if ((bits & MDS_INODELOCK_LOOKUP) && + ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP)) + bits &= ~MDS_INODELOCK_LOOKUP; + if ((bits & MDS_INODELOCK_UPDATE) && + ll_have_md_lock(inode, MDS_INODELOCK_UPDATE)) + bits &= ~MDS_INODELOCK_UPDATE; + if ((bits & MDS_INODELOCK_OPEN) && + ll_have_md_lock(inode, MDS_INODELOCK_OPEN)) + bits &= ~MDS_INODELOCK_OPEN; + if (lock->l_resource->lr_name.name[0] != inode->i_ino || lock->l_resource->lr_name.name[1] != inode->i_generation) { LDLM_ERROR(lock, "data mismatch with ino %lu/%u (%p)", @@ -306,26 +317,27 @@ void ll_i2gids(__u32 *suppgids, struct inode *i1, struct inode *i2) } } -int ll_prepare_mdc_op_data(struct mdc_op_data *data, struct inode *i1, +int ll_prepare_mdc_op_data(struct mdc_op_data *op_data, struct inode *i1, struct inode *i2, const char *name, int namelen, - int mode) + int mode, void *data) { LASSERT(i1); if (namelen > ll_i2sbi(i1)->ll_namelen) return -ENAMETOOLONG; - ll_i2gids(data->suppgids, i1, i2); - ll_inode2fid(&data->fid1, i1); + ll_i2gids(op_data->suppgids, i1, i2); + ll_inode2fid(&op_data->fid1, i1); if (i2) - ll_inode2fid(&data->fid2, i2); + ll_inode2fid(&op_data->fid2, i2); else - memset(&data->fid2, 0, sizeof(data->fid2)); + memset(&op_data->fid2, 0, sizeof(op_data->fid2)); - data->name = name; - data->namelen = namelen; - data->create_mode = mode; - data->mod_time = CURRENT_SECONDS; + op_data->name = name; + op_data->namelen = namelen; + op_data->create_mode = mode; + op_data->mod_time = CURRENT_SECONDS; + op_data->data = data; return 0; } @@ -522,7 +534,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, icbd.icbd_parent = parent; rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name, - dentry->d_name.len, lookup_flags); + dentry->d_name.len, lookup_flags, NULL); if (rc) RETURN(ERR_PTR(rc)); @@ -813,7 +825,7 @@ static int ll_new_node(struct inode *dir, struct qstr *name, tgt_len = strlen(tgt)+1; err = ll_prepare_mdc_op_data(&op_data, dir, NULL, name->name, - name->len, 0); + name->len, 0, NULL); if (err) GOTO(err_exit, err); @@ -950,7 +962,7 @@ static int ll_link_generic(struct inode *src, struct inode *dir, dir->i_generation, dir, name->len, name->name); err = ll_prepare_mdc_op_data(&op_data, src, dir, name->name, - name->len, 0); + name->len, 0, NULL); if (err) GOTO(out, err); err = mdc_link(sbi->ll_mdc_exp, &op_data, &request); @@ -984,11 +996,27 @@ static int ll_mkdir_generic(struct inode *dir, struct qstr *name, int mode, RETURN(err); } +/* Try to find the child dentry by its name. + If found, put the result fid into @fid. */ +static void ll_get_child_fid(struct inode * dir, struct qstr *name, + struct ll_fid *fid) +{ + struct dentry *parent, *child; + + parent = list_entry(dir->i_dentry.next, struct dentry, d_alias); + child = d_lookup(parent, name); + if (child) { + if (child->d_inode) + ll_inode2fid(fid, child->d_inode); + dput(child); + } +} + static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent, struct qstr *name) { struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct mdc_op_data op_data = {{0}}; struct dentry *dentry; int rc; ENTRY; @@ -1008,9 +1036,11 @@ static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent, } rc = ll_prepare_mdc_op_data(&op_data, dir, NULL, name->name, - name->len, S_IFDIR); + name->len, S_IFDIR, NULL); if (rc) GOTO(out, rc); + + ll_get_child_fid(dir, name, &op_data.fid3); rc = mdc_unlink(ll_i2sbi(dir)->ll_mdc_exp, &op_data, &request); if (rc) GOTO(out, rc); @@ -1100,7 +1130,7 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir) static int ll_unlink_generic(struct inode * dir, struct qstr *name) { struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct mdc_op_data op_data = {{0}}; int rc; ENTRY; @@ -1108,9 +1138,11 @@ static int ll_unlink_generic(struct inode * dir, struct qstr *name) name->len, name->name, dir->i_ino, dir->i_generation, dir); rc = ll_prepare_mdc_op_data(&op_data, dir, NULL, name->name, - name->len, 0); + name->len, 0, NULL); if (rc) GOTO(out, rc); + + ll_get_child_fid(dir, name, &op_data.fid3); rc = mdc_unlink(ll_i2sbi(dir)->ll_mdc_exp, &op_data, &request); if (rc) GOTO(out, rc); @@ -1131,7 +1163,7 @@ static int ll_rename_generic(struct inode *src, struct qstr *src_name, { struct ptlrpc_request *request = NULL; struct ll_sb_info *sbi = ll_i2sbi(src); - struct mdc_op_data op_data; + struct mdc_op_data op_data = {{0}}; int err; ENTRY; @@ -1140,9 +1172,12 @@ static int ll_rename_generic(struct inode *src, struct qstr *src_name, src->i_ino, src->i_generation, src, tgt_name->len, tgt_name->name, tgt->i_ino, tgt->i_generation, tgt); - err = ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0); + err = ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0, NULL); if (err) GOTO(out, err); + + ll_get_child_fid(src, src_name, &op_data.fid3); + ll_get_child_fid(tgt, tgt_name, &op_data.fid4); err = mdc_rename(sbi->ll_mdc_exp, &op_data, src_name->name, src_name->len, tgt_name->name, tgt_name->len, &request); diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 62771c8..85c80cd 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -36,7 +36,7 @@ struct lov_request { }; struct lov_request_set { - struct obd_enqueue_info *set_ei; + struct ldlm_enqueue_info*set_ei; struct obd_info *set_oi; atomic_t set_refcount; struct obd_export *set_exp; @@ -195,9 +195,10 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *obd_info, obd_off end, struct lov_request_set **reqset); int lov_fini_sync_set(struct lov_request_set *set); int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset); -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc); +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset); int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md *lsm, ldlm_policy_data_t *policy, __u32 mode, diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 5b9112f..f03b82a 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1832,12 +1832,13 @@ static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset, { struct lov_request_set *lovset = (struct lov_request_set *)data; ENTRY; - rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc); + rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset); RETURN(rc); } static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset) { struct lov_request_set *set; struct lov_request *req; @@ -1850,7 +1851,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, ASSERT_LSM_MAGIC(oinfo->oi_md); /* we should never be asked to replay a lock this way. */ - LASSERT((einfo->ei_flags & LDLM_FL_REPLAY) == 0); + LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -1864,20 +1865,20 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, req = list_entry(pos, struct lov_request, rq_link); rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp, - &req->rq_oi, einfo); + &req->rq_oi, einfo, rqset); if (rc != ELDLM_OK) GOTO(out, rc); } - if (einfo->ei_rqset && !list_empty(&einfo->ei_rqset->set_requests)) { + if (rqset && !list_empty(&rqset->set_requests)) { LASSERT(rc == 0); - LASSERT(einfo->ei_rqset->set_interpret == NULL); - einfo->ei_rqset->set_interpret = lov_enqueue_interpret; - einfo->ei_rqset->set_arg = (void *)set; + LASSERT(rqset->set_interpret == NULL); + rqset->set_interpret = lov_enqueue_interpret; + rqset->set_arg = (void *)set; RETURN(rc); } out: - rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc); + rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset); RETURN(rc); } diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index c7d96a9..df404c0 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -161,7 +161,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) ldlm_lock_allow_match(lock); LDLM_LOCK_PUT(lock); } else if ((rc == ELDLM_LOCK_ABORTED) && - (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) { + (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) { memset(lov_lockhp, 0, sizeof(*lov_lockhp)); lov_stripe_lock(set->set_oi->oi_md); loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; @@ -192,7 +192,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) /* The callback for osc_enqueue that updates lov info for every OSC request. */ static int cb_update_enqueue(struct obd_info *oinfo, int rc) { - struct obd_enqueue_info *einfo; + struct ldlm_enqueue_info *einfo; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); @@ -237,7 +237,8 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) RETURN(rc); } -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset) { int ret = 0; ENTRY; @@ -247,7 +248,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) LASSERT(set->set_exp); /* Do enqueue_done only for sync requests and if any request * succeeded. */ - if (!set->set_ei->ei_rqset) { + if (!rqset) { if (rc) set->set_completes = 0; ret = enqueue_done(set, mode); @@ -261,7 +262,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) } int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) { struct lov_obd *lov = &exp->exp_obd->u.lov; @@ -321,6 +322,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; + req->rq_oi.oi_flags = oinfo->oi_flags; LASSERT(req->rq_oi.oi_lockh); @@ -347,7 +349,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, *reqset = set; RETURN(0); out_set: - lov_fini_enqueue_set(set, einfo->ei_mode, rc); + lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL); RETURN(rc); } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 618f430..2380297 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -229,48 +229,46 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, - int lock_type, - struct lookup_intent *it, - int lock_mode, - struct mdc_op_data *data, - struct lustre_handle *lockh, - void *lmm, - int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags) +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct mdc_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = - { .name = {data->fid1.id, data->fid1.generation} }; + { .name = {op_data->fid1.id, op_data->fid1.generation} }; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; struct ldlm_request *lockreq; struct ldlm_intent *lit; struct ldlm_reply *lockrep; int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*lockreq), - [DLM_INTENT_IT_OFF] = sizeof(*lit) }; + [DLM_INTENT_IT_OFF] = sizeof(*lit), + 0, 0, 0, 0 }; int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize }; + cl_max_mds_easize, 0 }; int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repbufcnt = 4, rc; void *eadata; ENTRY; - LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type); + LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); // LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu", // ldlm_it2str(it->it_op), it_name, it_inode->i_ino); if (it->it_op & IT_OPEN) { + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; + it->it_create_mode |= S_IFREG; size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; /* As an optimization, we allocate an RPC request buffer for * at least a default-sized LOV EA even if we aren't sending * one. We grow the whole request to the next power-of-two @@ -286,27 +284,40 @@ int mdc_enqueue(struct obd_export *exp, min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc, obddev->u.cli.cl_max_mds_easize); - if (it->it_flags & O_JOIN_FILE) { - __u64 head_size = *(__u32*)cb_data; - __u32 tsize = *(__u32*)lmm; + /* If inode is known, cancel conflicting OPEN locks. */ + if (op_data->fid2.id) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; +#ifdef FMODE_EXEC + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; +#endif + else + mode = LCK_CR; + count = mdc_resource_get_unused(exp, &op_data->fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + mode, MDS_INODELOCK_UPDATE); + if (it->it_flags & O_JOIN_FILE) { /* join is like an unlink of the tail */ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join); - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 7, size, NULL); - /* when joining file, cb_data and lmm args together - * indicate the head file size*/ - mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, - (head_size << 32) | tsize); - cb_data = NULL; - lmm = NULL; + req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, + count); + mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data, + (*(__u64 *)op_data->data)); } else { - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 6, size, NULL); + req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, + count); } if (!req) @@ -322,16 +333,16 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode, - 0, it->it_flags, lmm, lmmsize); + mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, + it->it_create_mode, 0, it->it_flags, + lmm, lmmsize); repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; } else if (it->it_op & IT_UNLINK) { size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -341,7 +352,7 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data); + mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { @@ -349,13 +360,12 @@ int mdc_enqueue(struct obd_export *exp, OBD_MD_FLACL | OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA; size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; if (it->it_op & IT_GETATTR) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -366,13 +376,12 @@ int mdc_enqueue(struct obd_export *exp, /* pack the intended request */ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, - it->it_flags, data); + it->it_flags, op_data); repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; } else if (it->it_op == IT_READDIR) { policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -390,9 +399,8 @@ int mdc_enqueue(struct obd_export *exp, * rpcs in flight counter */ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy, - lock_mode, &flags, cb_blocking, cb_completion, - NULL, cb_data, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); mdc_exit_request(&obddev->u.cli); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); @@ -407,7 +415,7 @@ int mdc_enqueue(struct obd_export *exp, /* This can go when we're sure that this can never happen */ LASSERT(rc != -ENOENT); if (rc == ELDLM_LOCK_ABORTED) { - lock_mode = 0; + einfo->ei_mode = 0; memset(lockh, 0, sizeof(*lockh)); rc = 0; } else if (rc != 0) { @@ -422,10 +430,10 @@ int mdc_enqueue(struct obd_export *exp, /* If the server gave us back a different lock mode, we should * fix up our variables. */ - if (lock->l_req_mode != lock_mode) { + if (lock->l_req_mode != einfo->ei_mode) { ldlm_lock_addref(lockh, lock->l_req_mode); - ldlm_lock_decref(lockh, lock_mode); - lock_mode = lock->l_req_mode; + ldlm_lock_decref(lockh, einfo->ei_mode); + einfo->ei_mode = lock->l_req_mode; } LDLM_LOCK_PUT(lock); } @@ -437,7 +445,7 @@ int mdc_enqueue(struct obd_export *exp, it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; it->d.lustre.it_status = (int)lockrep->lock_policy_res2; - it->d.lustre.it_lock_mode = lock_mode; + it->d.lustre.it_lock_mode = einfo->ei_mode; it->d.lustre.it_data = req; if (it->d.lustre.it_status < 0 && req->rq_replay) @@ -624,11 +632,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * this and use the request from revalidate. In this case, revalidate * never dropped its reference, so the refcounts are all OK */ if (!it_disposition(it, DISP_ENQ_COMPLETE)) { + struct ldlm_enqueue_info einfo = + { LDLM_IBITS, it_to_lock_mode(it), cb_blocking, + ldlm_completion_ast, NULL, NULL }; - rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), - op_data, &lockh, lmm, lmmsize, - ldlm_completion_ast, cb_blocking, NULL, - extra_lock_flags); + rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, + lmm, lmmsize, extra_lock_flags); if (rc < 0) RETURN(rc); memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index e00a369..bc8ef76 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -62,23 +62,50 @@ static int mdc_reint(struct ptlrpc_request *request, return rc; } +/* Find and cancel locally locks matched by inode @bits & @mode in the resource + * found by @fid. Found locks are added into @cancel list. Returns the amount of + * locks added to @cancels list. */ +int mdc_resource_get_unused(struct obd_export *exp, struct ll_fid *fid, + struct list_head *cancels, ldlm_mode_t mode, + __u64 bits) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id = { .name = {fid->id, fid->generation} }; + struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + ldlm_policy_data_t policy = {{0}}; + int count; + ENTRY; + + if (res == NULL) + RETURN(0); + + /* Initialize ibits lock policy. */ + policy.l_inodebits.bits = bits; + count = ldlm_cancel_resource_local(res, cancels, &policy, + mode, 0, 0, NULL); + ldlm_resource_putref(res); + RETURN(count); +} + /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that * should take the normal semaphore and go to the normal portal. * * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a * magic open-path setattr that should take the setattr semaphore and * go to the setattr portal. */ -int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data, +int mdc_setattr(struct obd_export *exp, struct mdc_op_data *op_data, struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct ptlrpc_request *req; struct mds_rec_setattr *rec; struct mdc_rpc_lock *rpc_lock; struct obd_device *obd = exp->exp_obd; - int size[4] = { sizeof(struct ptlrpc_body), - sizeof(*rec), ealen, ea2len }; - int bufcount = 2, rc; + int size[5] = { sizeof(struct ptlrpc_body), + sizeof(*rec), ealen, ea2len, 0 }; + int count, bufcount = 2, rc; + __u64 bits; ENTRY; LASSERT(iattr != NULL); @@ -89,8 +116,22 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data, bufcount++; } + bits = MDS_INODELOCK_UPDATE; + if (iattr->ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID)) + bits |= MDS_INODELOCK_LOOKUP; + count = mdc_resource_get_unused(exp, &op_data->fid1, + &cancels, LCK_EX, bits); + if (exp_connect_cancelset(exp) && count) { + bufcount = 5; + size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); @@ -104,7 +145,8 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data, if (iattr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n", LTIME_S(iattr->ia_mtime), LTIME_S(iattr->ia_ctime)); - mdc_setattr_pack(req, REQ_REC_OFF, data, iattr, ea, ealen, ea2, ea2len); + mdc_setattr_pack(req, REQ_REC_OFF, op_data, iattr, + ea, ealen, ea2, ea2len); size[REPLY_REC_OFF] = sizeof(struct mds_body); ptlrpc_req_set_repsize(req, 2, size); @@ -121,12 +163,14 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data, const void *data, int datalen, int mode, __u32 uid, __u32 gid, __u32 cap_effective, __u64 rdev, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; int level, bufcount = 3, rc; - int size[4] = { sizeof(struct ptlrpc_body), + int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mds_rec_create), - op_data->namelen + 1 }; + op_data->namelen + 1, 0, 0 }; + int count; ENTRY; if (data && datalen) { @@ -134,8 +178,19 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data, bufcount++; } + count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + if (exp_connect_cancelset(exp) && count) { + bufcount = 5; + size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); @@ -163,20 +218,35 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data, RETURN(rc); } -int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data, +int mdc_unlink(struct obd_export *exp, struct mdc_op_data *op_data, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req = *request; int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_rec_unlink), - data->namelen + 1 }; - int rc; + op_data->namelen + 1, 0 }; + int count, rc, bufcount = 3; ENTRY; LASSERT(req == NULL); + count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + if (op_data->fid3.id) + count += mdc_resource_get_unused(exp, &op_data->fid3, &cancels, + LCK_EX, MDS_INODELOCK_FULL); + if (exp_connect_cancelset(exp) && count) { + bufcount = 4; + size[REQ_REC_OFF + 2] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 3, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 2); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); *request = req; @@ -186,7 +256,7 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data, size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize; ptlrpc_req_set_repsize(req, 4, size); - mdc_unlink_pack(req, REQ_REC_OFF, data); + mdc_unlink_pack(req, REQ_REC_OFF, op_data); rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL); if (rc == -ERESTARTSYS) @@ -194,23 +264,37 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); } -int mdc_link(struct obd_export *exp, struct mdc_op_data *data, +int mdc_link(struct obd_export *exp, struct mdc_op_data *op_data, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; - int size[3] = { sizeof(struct ptlrpc_body), + int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_rec_link), - data->namelen + 1 }; - int rc; + op_data->namelen + 1, 0 }; + int count, rc, bufcount = 3; ENTRY; + count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + count += mdc_resource_get_unused(exp, &op_data->fid2, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + if (exp_connect_cancelset(exp) && count) { + bufcount = 4; + size[REQ_REC_OFF + 2] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 3, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 2); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); - mdc_link_pack(req, REQ_REC_OFF, data); + mdc_link_pack(req, REQ_REC_OFF, op_data); size[REPLY_REC_OFF] = sizeof(struct mds_body); ptlrpc_req_set_repsize(req, 2, size); @@ -223,25 +307,44 @@ int mdc_link(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); } -int mdc_rename(struct obd_export *exp, struct mdc_op_data *data, +int mdc_rename(struct obd_export *exp, struct mdc_op_data *op_data, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { + CFS_LIST_HEAD(cancels); struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; - int size[4] = { sizeof(struct ptlrpc_body), + int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mds_rec_rename), - oldlen + 1, - newlen + 1 }; - int rc; + oldlen + 1, newlen + 1, 0 }; + int count, rc, bufcount = 4; ENTRY; + count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + count += mdc_resource_get_unused(exp, &op_data->fid2, &cancels, + LCK_EX, MDS_INODELOCK_UPDATE); + if (op_data->fid3.id) + count += mdc_resource_get_unused(exp, &op_data->fid3, &cancels, + LCK_EX, MDS_INODELOCK_LOOKUP); + if (op_data->fid4.id) + count += mdc_resource_get_unused(exp, &op_data->fid4, &cancels, + LCK_EX, MDS_INODELOCK_FULL); + if (exp_connect_cancelset(exp) && count) { + bufcount = 5; + size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_REINT, 4, size, NULL); + MDS_REINT, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (req == NULL) RETURN(-ENOMEM); - mdc_rename_pack(req, REQ_REC_OFF, data, old, oldlen, new, newlen); + mdc_rename_pack(req, REQ_REC_OFF, op_data, old, oldlen, new, newlen); size[REPLY_REC_OFF] = sizeof(struct mds_body); size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 1ffac2cb..503cb4e 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2258,7 +2258,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, struct obd_export *exp = req->rq_export; struct ldlm_request *dlmreq = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); - struct lustre_handle remote_hdl = dlmreq->lock_handle1; + struct lustre_handle remote_hdl = dlmreq->lock_handle[0]; struct list_head *iter; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) diff --git a/lustre/mds/mds_lib.c b/lustre/mds/mds_lib.c index 2a68fb5..6f9bffb 100644 --- a/lustre/mds/mds_lib.c +++ b/lustre/mds/mds_lib.c @@ -120,21 +120,25 @@ static int mds_setattr_unpack(struct ptlrpc_request *req, int offset, r->ur_flags = rec->sa_attr_flags; LASSERT_REQSWAB (req, offset + 1); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 1) { + r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + if (r->ur_eadatalen) { r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 1, 0); if (r->ur_eadata == NULL) RETURN(-EFAULT); - r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); } - - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_cookielen) { r->ur_logcookies = lustre_msg_buf(req->rq_reqmsg, offset + 2,0); if (r->ur_eadata == NULL) RETURN (-EFAULT); - - r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); } - + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -168,7 +172,8 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset, r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); LASSERT_REQSWAB(req, offset + 2); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_tgtlen) { /* NB for now, we only seem to pass NULL terminated symlink * target strings here. If this ever changes, we'll have * to stop checking for a buffer filled completely with a @@ -179,7 +184,13 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset, r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0); if (r->ur_tgt == NULL) RETURN (-EFAULT); - r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + } + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); } RETURN(0); } @@ -209,6 +220,13 @@ static int mds_link_unpack(struct ptlrpc_request *req, int offset, if (r->ur_name == NULL) RETURN (-EFAULT); r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 2, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -238,6 +256,14 @@ static int mds_unlink_unpack(struct ptlrpc_request *req, int offset, if (r->ur_name == NULL) RETURN(-EFAULT); r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + + if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 2, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -272,6 +298,13 @@ static int mds_rename_unpack(struct ptlrpc_request *req, int offset, if (r->ur_tgt == NULL) RETURN(-EFAULT); r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -305,11 +338,11 @@ static int mds_open_unpack(struct ptlrpc_request *req, int offset, r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); LASSERT_REQSWAB(req, offset + 2); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_eadatalen) { r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0); if (r->ur_eadata == NULL) RETURN (-EFAULT); - r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); } RETURN(0); } diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index f4e3753..481059d 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -518,6 +518,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req)); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN || (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)) { de = mds_fid2dentry(mds, rec->ur_fid1, NULL); @@ -784,6 +787,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) GOTO(cleanup, rc = -ESTALE); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh, rec->ur_name, rec->ur_namelen - 1, MDS_INODELOCK_UPDATE); @@ -1568,6 +1574,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) GOTO(cleanup, rc = -ENOENT); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, &parent_lockh, &dparent, LCK_EX, MDS_INODELOCK_UPDATE, @@ -1807,6 +1816,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) GOTO(cleanup, rc = -ENOENT); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + /* Step 1: Lookup the source inode and target directory by FID */ de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL); if (IS_ERR(de_src)) @@ -2149,6 +2161,9 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir, rec->ur_fid2, &de_tgtdir, LCK_EX, rec->ur_name, rec->ur_namelen, diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index 30399e9..1d22aa1 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -598,6 +598,9 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, struct lustre_handle *lockh) { struct config_llog_data *cld = (struct config_llog_data *)data; + struct ldlm_enqueue_info einfo = { type, mode, mgc_blocking_ast, + ldlm_completion_ast, NULL, data}; + int rc; ENTRY; @@ -611,10 +614,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, /* We need a callback for every lockholder, so don't try to ldlm_lock_match (see rev 1.1.2.11.2.47) */ - rc = ldlm_cli_enqueue(exp, NULL, cld->cld_resid, - type, NULL, mode, flags, - mgc_blocking_ast, ldlm_completion_ast, NULL, - data, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, NULL, &einfo, cld->cld_resid, + NULL, flags, NULL, 0, NULL, lockh, 0); /* A failed enqueue should still call the mgc_blocking_ast, where it will be requeued if needed ("grant failed"). */ diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index cf7f33d..e420c84 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1102,7 +1102,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa, struct obd_device *obd = exp->exp_obd; struct echo_client_obd *ec = &obd->u.echo_client; struct lustre_handle *ulh = obdo_handle (oa); - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ec_object *eco; struct ec_lock *ecl; @@ -1140,7 +1140,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa, oinfo.oi_policy = ecl->ecl_policy; oinfo.oi_lockh = &ecl->ecl_lock_handle; oinfo.oi_md = eco->eco_lsm; - rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo); + rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL); if (rc != 0) goto failed_1; diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index aa57346..5b3b61b 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1470,8 +1470,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ - if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 0, 1); } RETURN(ELDLM_LOCK_ABORTED); } @@ -1497,8 +1496,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * XXX nikita: situation when ldlm_server_glimpse_ast() failed before * sending ast is not handled. This can result in lost client writes. */ - if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); + if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 0, 1); lock_res(res); *reply_lvb = *res_lvb; @@ -2500,7 +2499,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti) { struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id } }; - struct ldlm_valblock_ops *ns_lvbo; struct filter_mod_data *fmd; struct lvfs_run_ctxt saved; struct filter_obd *filter; @@ -2535,9 +2533,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, res_id, LDLM_EXTENT, 0); if (res != NULL) { - ns_lvbo = res->lr_namespace->ns_lvbo; - if (ns_lvbo && ns_lvbo->lvbo_update) - rc = ns_lvbo->lvbo_update(res, NULL, 0, 0); + rc = ldlm_res_lvbo_update(res, NULL, 0, 0); ldlm_resource_putref(res); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 0a5b9ff..65833f7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -543,6 +543,28 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, return rc; } +/* Find and cancel locally locks matched by @mode in the resource found by + * @objid. Found locks are added into @cancel list. Returns the amount of + * locks added to @cancels list. */ +static int osc_resource_get_unused(struct obd_export *exp, __u64 objid, + struct list_head *cancels, ldlm_mode_t mode, + int lock_flags) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id = { .name = { objid } }; + struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + int count; + ENTRY; + + if (res == NULL) + RETURN(0); + + count = ldlm_cancel_resource_local(res, cancels, NULL, mode, + lock_flags, 0, NULL); + ldlm_resource_putref(res); + RETURN(count); +} + /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -557,9 +579,11 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti, struct obd_export *md_export) { + CFS_LIST_HEAD(cancels); struct ptlrpc_request *req; struct ost_body *body; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 }; + int count, bufcount = 2; ENTRY; if (!oa) { @@ -567,8 +591,19 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } + count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW, + LDLM_FL_DISCARD_DATA); + if (exp_connect_cancelset(exp) && count) { + bufcount = 3; + size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY); + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_DESTROY, 2, size, NULL); + OST_DESTROY, bufcount, size, NULL); + if (req) + ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1); + else + ldlm_lock_list_put(&cancels, l_bl_ast, count); + if (!req) RETURN(-ENOMEM); @@ -2686,7 +2721,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, static int osc_enqueue_interpret(struct ptlrpc_request *req, struct osc_enqueue_args *aa, int rc) { - int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT; + int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT; struct lov_stripe_md *lsm = aa->oa_oi->oi_md; struct ldlm_lock *lock; @@ -2697,7 +2732,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, aa->oa_ei->ei_mode, - &aa->oa_ei->ei_flags, + &aa->oa_oi->oi_flags, &lsm->lsm_oinfo[0]->loi_lvb, sizeof(lsm->lsm_oinfo[0]->loi_lvb), lustre_swab_ost_lvb, @@ -2724,13 +2759,14 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, * is excluded from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset) { struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} }; struct obd_device *obd = exp->exp_obd; struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; - int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT; + int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; int rc; ENTRY; @@ -2744,12 +2780,13 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, goto no_match; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id, + rc = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode, oinfo->oi_lockh); if (rc == 1) { osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, - einfo->ei_flags); + oinfo->oi_flags); if (intent) { /* I would like to be able to ASSERT here that rss <= * kms, but I can't, for reasons which are explained in @@ -2760,7 +2797,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up(oinfo, ELDLM_OK); /* For async requests, decref the lock. */ - if (einfo->ei_rqset) + if (rqset) ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); RETURN(ELDLM_OK); @@ -2779,7 +2816,8 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, * locks out from other users right now, too. */ if (einfo->ei_mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, + rc = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, einfo->ei_type, &oinfo->oi_policy, LCK_PW, oinfo->oi_lockh); if (rc == 1) { @@ -2787,11 +2825,11 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ /* addref the lock only if not async requests. */ - if (!einfo->ei_rqset) + if (!rqset) ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, - einfo->ei_flags); + oinfo->oi_flags); oinfo->oi_cb_up(oinfo, ELDLM_OK); ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); RETURN(ELDLM_OK); @@ -2802,10 +2840,10 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, if (intent) { int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) }; + [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), + [DLM_LOCKREQ_OFF + 1] = 0 }; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) RETURN(-ENOMEM); @@ -2816,18 +2854,15 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, } /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ - einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED; + oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED; - rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type, - &oinfo->oi_policy, einfo->ei_mode, - &einfo->ei_flags, einfo->ei_cb_bl, - einfo->ei_cb_cp, einfo->ei_cb_gl, - einfo->ei_cbdata, + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, + &oinfo->oi_policy, &oinfo->oi_flags, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb), lustre_swab_ost_lvb, oinfo->oi_lockh, - einfo->ei_rqset ? 1 : 0); - if (einfo->ei_rqset) { + rqset ? 1 : 0); + if (rqset) { if (!rc) { struct osc_enqueue_args *aa; CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); @@ -2837,7 +2872,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, aa->oa_exp = exp; req->rq_interpret_reply = osc_enqueue_interpret; - ptlrpc_set_add_req(einfo->ei_rqset, req); + ptlrpc_set_add_req(rqset, req); } else if (intent) { ptlrpc_req_finished(req); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 3814526..1de75abb 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -92,6 +92,15 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, if (body == NULL) RETURN(-EFAULT); + if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) { + struct ldlm_request *dlm; + dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm), + lustre_swab_ldlm_request); + if (dlm == NULL) + RETURN (-EFAULT); + ldlm_request_cancel(req, dlm, 0); + } + rc = lustre_pack_reply(req, 2, size, NULL); if (rc) RETURN(rc); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index a0ff87c..6366cf1 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -570,13 +570,13 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) err = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { + struct obd_export *exp = req->rq_export; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + LCONSOLE_ERROR_MSG(0x011, "an error ocurred while communicating" " with %s The %s operation failed with %d", - req->rq_export ? - obd_export_nid2str(req->rq_export) - : "(no nid)", - ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), - err); + exp ? obd_export_nid2str(exp) : "(no nid)", + ll_opcode2str(opc), err); RETURN(err < 0 ? err : -EINVAL); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 0279dcf..6f53063 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2009,10 +2009,9 @@ void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l) void lustre_swab_ldlm_request (struct ldlm_request *rq) { __swab32s (&rq->lock_flags); - CLASSERT(offsetof(typeof(*rq), lock_padding) != 0); lustre_swab_ldlm_lock_desc (&rq->lock_desc); - /* lock_handle1 opaque */ - /* lock_handle2 opaque */ + __swab32s (&rq->lock_count); + /* lock_handle[] opaque */ } void lustre_swab_ldlm_reply (struct ldlm_reply *r) diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 0f40241..c29cb6f 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -462,7 +462,10 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_RMT_CLIENT == 0x20000ULL); CLASSERT(OBD_CONNECT_BRW_SIZE == 0x40000ULL); CLASSERT(OBD_CONNECT_QUOTA64 == 0x80000ULL); - + CLASSERT(OBD_CONNECT_FID_CAPA == 0x100000ULL); + CLASSERT(OBD_CONNECT_OSS_CAPA == 0x200000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); + /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", (long long)(int)sizeof(struct obdo)); @@ -1468,22 +1471,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_request, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_count)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count)); LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", (long long)(int)offsetof(struct ldlm_request, lock_desc)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle1)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle2)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2)); + LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_handle)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle)); /* Checks for struct ldlm_reply */ LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n", @@ -1492,14 +1491,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_reply, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); - LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_desc)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_padding)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_desc)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc)); LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n", (long long)(int)offsetof(struct ldlm_reply, lock_handle)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n", diff --git a/lustre/tests/recovery-small.sh b/lustre/tests/recovery-small.sh index 7bd1efb..da37530 100755 --- a/lustre/tests/recovery-small.sh +++ b/lustre/tests/recovery-small.sh @@ -880,5 +880,17 @@ test_58() { # bug 11546 } run_test 58 "Eviction in the middle of open RPC reply processing" +test_59() { # bug 10589 + zconf_mount `hostname` $MOUNT2 || error "Failed to mount $MOUNT2" + sysctl -w lustre.fail_loc=0x311 + writes=`dd if=/dev/zero of=$DIR2/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'` + sysctl -w lustre.fail_loc=0 + sync + zconf_umount `hostname` $DIR2 -f + reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'` + [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes +} +run_test 59 "Read cancel race on client eviction" + $CLEANUP echo "$0: completed" diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index b1f6125..50540ca 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -3856,6 +3856,139 @@ test_119b() # bug 11737 } run_test 119b "Sparse directIO read must return actual read amount" +test_120a() { + mkdir $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + mkdir $DIR/$tdir/d1 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120a "Early Lock Cancel: mkdir test" + +test_120b() { + mkdir $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + touch $DIR/$tdir/f1 + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120b "Early Lock Cancel: create test" + +test_120c() { + mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 + touch $DIR/$tdir/d1/f1 + cancel_lru_locks mdc + stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + ln $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120c "Early Lock Cancel: link test" + +test_120d() { + touch $DIR/$tdir + cancel_lru_locks mdc + stat $DIR/$tdir > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + chmod a+x $DIR/$tdir + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120d "Early Lock Cancel: setattr test" + +test_120e() { + mkdir $DIR/$tdir + dd if=/dev/zero of=$DIR/$tdir/f1 count=1 + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$DIR/$tdir/f1 of=/dev/null + stat $DIR/$tdir $DIR/$tdir/f1 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + unlink $DIR/$tdir/f1 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120e "Early Lock Cancel: unlink test" + +test_120f() { + mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 + dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1 + dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1 + cancel_lru_locks mdc + cancel_lru_locks osc + dd if=$DIR/$tdir/d1/f1 of=/dev/null + dd if=$DIR/$tdir/d2/f2 of=/dev/null + stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 > /dev/null + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + mv $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." + [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." +} +run_test 120f "Early Lock Cancel: rename test" + +test_120g() { + count=10000 + echo create $count files + mkdir $DIR/$tdir + cancel_lru_locks mdc + cancel_lru_locks osc + t0=`date +%s` + + can0=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk0=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + createmany -o $DIR/$tdir/f $count + sync + can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + t1=`date +%s` + echo total: $((can1-can0)) cancels, $((blk1-blk0)) blockings + echo rm $count files + rm -r $DIR/$tdir + sync + can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats` + blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats` + t2=`date +%s` + echo total: $count removes in $((t2-t1)) + echo total: $((can2-can1)) cancels, $((blk2-blk1)) blockings + sleep 2 + # wait for commitment of removal +} +run_test 120g "Early Lock Cancel: performance test" + +test_121() { #bug #10589 + rm -rf $DIR/$tfile + writes=`dd if=/dev/zero of=$DIR/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'` + sysctl -w lustre.fail_loc=0x310 + cancel_lru_locks osc > /dev/null + reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'` + sysctl -w lustre.fail_loc=0 + [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes +} +run_test 121 "read cancel race =========" + TMPDIR=$OLDTMPDIR TMP=$OLDTMP HOME=$OLDHOME diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index b45ee6f..29f0264 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -176,6 +176,9 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_RMT_CLIENT); CHECK_CDEFINE(OBD_CONNECT_BRW_SIZE); CHECK_CDEFINE(OBD_CONNECT_QUOTA64); + CHECK_CDEFINE(OBD_CONNECT_FID_CAPA); + CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA); + CHECK_CDEFINE(OBD_CONNECT_CANCELSET); } static void @@ -661,10 +664,9 @@ check_ldlm_request(void) BLANK_LINE(); CHECK_STRUCT(ldlm_request); CHECK_MEMBER(ldlm_request, lock_flags); - CHECK_MEMBER(ldlm_request, lock_padding); + CHECK_MEMBER(ldlm_request, lock_count); CHECK_MEMBER(ldlm_request, lock_desc); - CHECK_MEMBER(ldlm_request, lock_handle1); - CHECK_MEMBER(ldlm_request, lock_handle2); + CHECK_MEMBER(ldlm_request, lock_handle); } static void @@ -673,8 +675,8 @@ check_ldlm_reply(void) BLANK_LINE(); CHECK_STRUCT(ldlm_reply); CHECK_MEMBER(ldlm_reply, lock_flags); - CHECK_MEMBER(ldlm_request, lock_padding); - CHECK_MEMBER(ldlm_request, lock_desc); + CHECK_MEMBER(ldlm_reply, lock_padding); + CHECK_MEMBER(ldlm_reply, lock_desc); CHECK_MEMBER(ldlm_reply, lock_handle); CHECK_MEMBER(ldlm_reply, lock_policy_res1); CHECK_MEMBER(ldlm_reply, lock_policy_res2); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 5d6d250..e07146f 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -478,7 +478,10 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_RMT_CLIENT == 0x20000ULL); CLASSERT(OBD_CONNECT_BRW_SIZE == 0x40000ULL); CLASSERT(OBD_CONNECT_QUOTA64 == 0x80000ULL); - + CLASSERT(OBD_CONNECT_FID_CAPA == 0x100000ULL); + CLASSERT(OBD_CONNECT_OSS_CAPA == 0x200000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); + /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", (long long)(int)sizeof(struct obdo)); @@ -1484,22 +1487,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_request, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_count)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count)); LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", (long long)(int)offsetof(struct ldlm_request, lock_desc)); LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle1)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1)); - LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_handle2)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2)); + LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n", + (long long)(int)offsetof(struct ldlm_request, lock_handle)); + LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle)); /* Checks for struct ldlm_reply */ LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n", @@ -1508,14 +1507,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_reply, lock_flags)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags)); - LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_padding)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding)); - LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n", - (long long)(int)offsetof(struct ldlm_request, lock_desc)); - LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_padding)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding)); + LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n", + (long long)(int)offsetof(struct ldlm_reply, lock_desc)); + LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n", + (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc)); LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n", (long long)(int)offsetof(struct ldlm_reply, lock_handle)); LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n", -- 1.8.3.1