From fe79e1b3a463c6ff8d6a8c8be2b9505df5a5702a Mon Sep 17 00:00:00 2001 From: phil Date: Mon, 23 Feb 2004 23:37:30 +0000 Subject: [PATCH] b=1021,2720 land b_size on HEAD for 1.2 --- lnet/libcfs/debug.c | 6 +- lustre/ChangeLog | 2 + lustre/include/linux/lustre_compat25.h | 12 +- lustre/include/linux/lustre_dlm.h | 75 ++- lustre/include/linux/lustre_idl.h | 22 +- lustre/include/linux/lustre_net.h | 1 + lustre/include/linux/obd.h | 31 +- lustre/include/linux/obd_class.h | 43 +- lustre/include/linux/obd_echo.h | 2 +- lustre/include/linux/obd_support.h | 1 + lustre/include/linux/rbtree.h | 132 ----- .../kernel_patches/patches/2.6.0-test6-mm4.patch | 46 +- lustre/kernel_patches/patches/bproc-patch-2.4.20 | 6 +- lustre/ldlm/l_lock.c | 2 +- lustre/ldlm/ldlm_extent.c | 31 +- lustre/ldlm/ldlm_flock.c | 3 +- lustre/ldlm/ldlm_internal.h | 8 +- lustre/ldlm/ldlm_lock.c | 94 ++-- lustre/ldlm/ldlm_lockd.c | 210 ++++++-- lustre/ldlm/ldlm_request.c | 155 +++--- lustre/ldlm/ldlm_resource.c | 16 +- lustre/llite/dcache.c | 6 + lustre/llite/dir.c | 2 +- lustre/llite/file.c | 484 ++++++++++------- lustre/llite/llite_close.c | 16 +- lustre/llite/llite_internal.h | 9 +- lustre/llite/llite_lib.c | 17 +- lustre/llite/namei.c | 17 +- lustre/llite/rw.c | 31 +- lustre/lov/lov_obd.c | 584 ++++++++++++++++++--- lustre/mdc/mdc_locks.c | 17 +- lustre/mdc/mdc_request.c | 10 +- lustre/mds/handler.c | 257 +++++---- lustre/mds/mds_open.c | 8 +- lustre/mds/mds_reint.c | 43 +- lustre/obdclass/class_obd.c | 2 +- lustre/obdclass/lprocfs_status.c | 1 - lustre/obdclass/rbtree.c | 338 ------------ lustre/obdecho/echo_client.c | 13 +- lustre/obdfilter/Makefile.am | 2 +- lustre/obdfilter/filter.c | 228 +++++--- lustre/obdfilter/filter_internal.h | 7 +- lustre/obdfilter/filter_lvb.c | 189 +++++++ lustre/osc/osc_request.c | 61 +-- lustre/ost/ost_handler.c | 3 +- lustre/portals/libcfs/debug.c | 6 +- lustre/ptlrpc/lproc_ptlrpc.c | 1 + lustre/ptlrpc/pack_generic.c | 58 +- lustre/ptlrpc/ptlrpc_module.c | 2 + lustre/scripts/lustre.spec.in | 2 +- lustre/tests/sanity.sh | 13 +- lustre/tests/sanityN.sh | 37 +- lustre/utils/wirecheck.c | 6 +- lustre/utils/wiretest.c | 50 +- 54 files changed, 2002 insertions(+), 1416 deletions(-) delete mode 100644 lustre/include/linux/rbtree.h delete mode 100644 lustre/obdclass/rbtree.c create mode 100644 lustre/obdfilter/filter_lvb.c diff --git a/lnet/libcfs/debug.c b/lnet/libcfs/debug.c index 7ad9327..e98779f 100644 --- a/lnet/libcfs/debug.c +++ b/lnet/libcfs/debug.c @@ -945,9 +945,6 @@ char *portals_nid2str(int nal, ptl_nid_t nid, char *str) } #ifdef __KERNEL__ -#include -#if (LUSTRE_KERNEL_VERSION >= 30) -#warning "FIXME: remove workaround when l30 is widely used" char stack_backtrace[LUSTRE_TRACE_SIZE]; spinlock_t stack_backtrace_lock = SPIN_LOCK_UNLOCKED; @@ -958,7 +955,7 @@ extern int is_kernel_text_address(unsigned long addr); char *portals_debug_dumpstack(void) { asm("int $3"); - return "dump stack"; + return "dump stack\n"; } #elif defined(__i386__) @@ -1020,7 +1017,6 @@ char *portals_debug_dumpstack(void) #endif /* __arch_um__ */ EXPORT_SYMBOL(stack_backtrace_lock); EXPORT_SYMBOL(portals_debug_dumpstack); -#endif /* LUSTRE_KERNEL_VERSION < 30 */ #endif /* __KERNEL__ */ EXPORT_SYMBOL(portals_debug_dumplog); diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 54abc71..8807642 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -6,6 +6,8 @@ tbd Cluster File Systems, Inc. - reduce journal credits needed for BRW writes (2370) - orphan handling to avoid losing space on client/server crashes - ptlrpcd can be blocked, stopping ALL progress (2477) + - use lock value blocks to assist in proper KMS, faster stat (1021) + - takes i_sem instead of DLM locks internally on obdfilter (2720) - recovery for initial connections (2355) - fixes for mds_cleanup_orphans (1934) - abort_recovery crashes MDS in b_eq (mds_unlink_orphan) (2584) diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index a1fb3dc..a0cafd9 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -56,6 +56,10 @@ #define ll_pgcache_lock(mapping) spin_lock(&mapping->page_lock) #define ll_pgcache_unlock(mapping) spin_unlock(&mapping->page_lock) +#define ll_call_writepage(inode, page) \ + (inode)->i_mapping->a_ops->writepage(page, NULL) +#define ll_truncate_complete_page(page) \ + truncate_complete_page(page->mapping, page) #define ll_vfs_create(a,b,c,d) vfs_create(a,b,c,d) @@ -79,11 +83,6 @@ static inline void lustre_daemonize_helper(void) current->tty = NULL; } -#define rb_node_s rb_node -#define rb_root_s rb_root -typedef struct rb_root_s rb_root_t; -typedef struct rb_node_s rb_node_t; - #define smp_num_cpus NR_CPUS #ifndef conditional_schedule @@ -141,6 +140,9 @@ typedef long sector_t; #define ll_pgcache_lock(mapping) spin_lock(&pagecache_lock) #define ll_pgcache_unlock(mapping) spin_unlock(&pagecache_lock) +#define ll_call_writepage(inode, page) \ + (inode)->i_mapping->a_ops->writepage(page) +#define ll_truncate_complete_page(page) truncate_complete_page(page) static inline void __d_drop(struct dentry *dentry) { diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 99c1785..e37dcb1 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -73,10 +73,19 @@ typedef enum { /* These are flags that are mapped into the flags and ASTs of blocking locks */ #define LDLM_AST_DISCARD_DATA 0x80000000 /* Add FL_DISCARD to blocking ASTs */ - /* Flags sent in AST lock_flags to be mapped into the receiving lock. */ #define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA) +/* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that + * the LVB filling happens _after_ the lock has been granted, so another thread + * can match before the LVB has been updated. As a dirty hack, we set + * LDLM_FL_CAN_MATCH only after we've done the LVB poop. + * + * The proper fix is to do the granting inside of the completion AST, which can + * be replaced with a LVB-aware wrapping function for OSC locks. That change is + * pretty high-risk, though, and would need a lot more testing. */ +#define LDLM_FL_CAN_MATCH 0x100000 + /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ #define LDLM_CB_BLOCKING 1 @@ -124,6 +133,20 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) - */ +struct ldlm_lock; +struct ldlm_resource; +struct ldlm_namespace; + +typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, + void *req_cookie, ldlm_mode_t mode, int flags, + void *data); + +struct ldlm_valblock_ops { + int (*lvbo_init)(struct ldlm_resource *res); + int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m, + int buf_idx); +}; + struct ldlm_namespace { char *ns_name; __u32 ns_client; /* is this a client-side lock tree? */ @@ -143,6 +166,9 @@ struct ldlm_namespace { spinlock_t ns_counter_lock; __u64 ns_locks; __u64 ns_resources; + ldlm_res_policy ns_policy; + struct ldlm_valblock_ops *ns_lvbo; + void *ns_lvbp; }; /* @@ -162,6 +188,7 @@ typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock, int flag); typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags, void *data); +typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data); struct ldlm_lock { struct portals_handle l_handle; // must be first in the structure @@ -181,6 +208,7 @@ struct ldlm_lock { ldlm_completion_callback l_completion_ast; ldlm_blocking_callback l_blocking_ast; + ldlm_glimpse_callback l_glimpse_ast; void *l_ast_data; struct obd_export *l_export; @@ -190,7 +218,12 @@ struct ldlm_lock { __u32 l_flags; struct lustre_handle l_remote_handle; ldlm_policy_data_t l_policy_data; - __u32 l_version[RES_VERSION_SIZE]; + + /* This LVB is used only on the client side, as temporary storage for + * a lock value block received during an enqueue */ + __u32 l_lvb_len; + void *l_lvb_data; + void *l_lvb_swabber; __u32 l_readers; __u32 l_writers; @@ -201,11 +234,9 @@ struct ldlm_lock { * on this waitq to learn when it becomes granted. */ wait_queue_head_t l_waitq; struct timeval l_enqueued_time; + unsigned long l_last_used; /* jiffies */ }; -typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, - void *req_cookie, ldlm_mode_t mode, int flags, - void *data); #define LDLM_PLAIN 10 #define LDLM_EXTENT 11 @@ -229,9 +260,13 @@ struct ldlm_resource { __u32 lr_type; /* LDLM_PLAIN or LDLM_EXTENT */ struct ldlm_resource *lr_root; struct ldlm_res_id lr_name; - __u32 lr_version[RES_VERSION_SIZE]; atomic_t lr_refcount; + /* Server-side-only lock value block elements */ + struct semaphore lr_lvb_sem; + __u32 lr_lvb_len; + void *lr_lvb_data; + /* lr_tmp holds a list head temporarily, during the building of a work * queue. see ldlm_add_ast_work_item and ldlm_run_ast_work */ void *lr_tmp; @@ -341,6 +376,9 @@ do { \ #define LDLM_DEBUG_NOLOCK(format, a...) \ CDEBUG(D_DLMTRACE, "### " format "\n" , ## a) +typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags, + int first_enq, ldlm_error_t *err); + /* * Iterators. */ @@ -365,12 +403,17 @@ void ldlm_change_cbdata(struct ldlm_namespace *, struct ldlm_res_id *, /* ldlm_flock.c */ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data); +/* ldlm_extent.c */ +__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms); + + /* ldlm_lockd.c */ int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data); +int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data); int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, - ldlm_blocking_callback); + ldlm_blocking_callback, ldlm_glimpse_callback); int ldlm_handle_convert(struct ptlrpc_request *req); int ldlm_handle_cancel(struct ptlrpc_request *req); int ldlm_del_waiting_lock(struct ldlm_lock *lock); @@ -378,8 +421,8 @@ int ldlm_get_ref(void); void ldlm_put_ref(int force); /* ldlm_lock.c */ -void ldlm_register_intent(ldlm_res_policy arg); -void ldlm_unregister_intent(void); +ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res); +void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags); void ldlm_cancel_callback(struct ldlm_lock *); @@ -413,8 +456,9 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); +void ldlm_lock_allow_match(struct ldlm_lock *lock); int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *, - __u32 type, void *cookie, int cookielen, ldlm_mode_t mode, + __u32 type, ldlm_policy_data_t *, ldlm_mode_t mode, struct lustre_handle *); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); @@ -465,15 +509,18 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_namespace *ns, - struct lustre_handle *parent_lock_handle, struct ldlm_res_id, __u32 type, - void *cookie, int cookielen, + ldlm_policy_data_t *, ldlm_mode_t mode, int *flags, + ldlm_blocking_callback blocking, ldlm_completion_callback completion, - ldlm_blocking_callback callback, + ldlm_glimpse_callback glimpse, void *data, + void *lvb, + __u32 lvb_len, + void *lvb_swabber, struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); @@ -483,7 +530,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *, int flags, void *opaque); /* mds/handler.c */ -/* This has to be here because recurisve inclusion sucks. */ +/* This has to be here because recursive inclusion sucks. */ int intent_disposition(struct ldlm_reply *rep, int flag); void intent_set_disposition(struct ldlm_reply *rep, int flag); int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 3fa0a61..7de8875 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -453,6 +453,15 @@ struct ost_body { extern void lustre_swab_ost_body (struct ost_body *b); extern void lustre_swab_ost_last_id(obd_id *id); +/* lock value block communicated between the filter and llite */ + +struct ost_lvb { + __u64 lvb_size; + __u64 lvb_time; +}; + +extern void lustre_swab_ost_lvb(struct ost_lvb *); + /* * MDS REQ RECORDS */ @@ -691,13 +700,12 @@ typedef enum { LDLM_CANCEL = 103, LDLM_BL_CALLBACK = 104, LDLM_CP_CALLBACK = 105, + LDLM_GL_CALLBACK = 106, LDLM_LAST_OPC } ldlm_cmd_t; #define LDLM_FIRST_OPC LDLM_ENQUEUE #define RES_NAME_SIZE 4 -#define RES_VERSION_SIZE 4 - struct ldlm_res_id { __u64 name[RES_NAME_SIZE]; }; @@ -746,12 +754,10 @@ struct ldlm_intent { extern void lustre_swab_ldlm_intent (struct ldlm_intent *i); -/* Note this unaligned structure; as long as it's only used in ldlm_request - * below, we're probably fine. */ struct ldlm_resource_desc { __u32 lr_type; + __u32 lr_padding; struct ldlm_res_id lr_name; - __u32 lr_version[RES_VERSION_SIZE]; } __attribute__((packed)); extern void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r); @@ -761,13 +767,13 @@ struct ldlm_lock_desc { ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; ldlm_policy_data_t l_policy_data; - __u32 l_version[RES_VERSION_SIZE]; } __attribute__((packed)); extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l); struct ldlm_request { __u32 lock_flags; + __u32 lock_padding; struct ldlm_lock_desc lock_desc; struct lustre_handle lock_handle1; struct lustre_handle lock_handle2; @@ -777,10 +783,8 @@ extern void lustre_swab_ldlm_request (struct ldlm_request *rq); struct ldlm_reply { __u32 lock_flags; - __u32 lock_mode; - struct ldlm_res_id lock_resource_name; + struct ldlm_lock_desc lock_desc; struct lustre_handle lock_handle; - ldlm_policy_data_t lock_policy_data; __u64 lock_policy_res1; __u64 lock_policy_res2; }; diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 8a0b3f0..860c6b8 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -626,6 +626,7 @@ int lustre_msg_size(int count, int *lengths); int lustre_unpack_msg(struct lustre_msg *m, int len); void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen); char *lustre_msg_string (struct lustre_msg *m, int n, int max_len); +void *lustre_swab_buf(struct lustre_msg *, int n, int minlen, void *swabber); void *lustre_swab_reqbuf (struct ptlrpc_request *req, int n, int minlen, void *swabber); void *lustre_swab_repbuf (struct ptlrpc_request *req, int n, int minlen, diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index ec90c84..c43c62d 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -54,7 +54,11 @@ struct lov_oinfo { /* per-stripe data structure */ struct loi_oap_pages loi_write_lop; /* _cli_ is poorly named, it should be _ready_ */ struct list_head loi_cli_item; - struct list_head loi_write_item; + struct list_head loi_write_item; + + __u64 loi_kms; /* known minimum size */ + __u64 loi_rss; /* recently seen size */ + __u64 loi_mtime; /* recently seen mtime */ }; static inline void loi_init(struct lov_oinfo *loi) @@ -605,17 +609,16 @@ struct obd_ops { int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *local, struct obd_trans_info *oti); - int (*o_enqueue)(struct obd_export *exp, struct lov_stripe_md *md, - struct lustre_handle *parent_lock, - __u32 type, void *cookie, int cookielen, __u32 mode, - int *flags, void *cb, void *data, + int (*o_enqueue)(struct obd_export *, struct lov_stripe_md *, + __u32 type, ldlm_policy_data_t *, __u32 mode, + int *flags, void *bl_cb, void *cp_cb, void *gl_cb, + void *data, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh); - int (*o_match)(struct obd_export *exp, struct lov_stripe_md *md, - __u32 type, void *cookie, int cookielen, __u32 mode, - int *flags, void *data, struct lustre_handle *lockh); - int (*o_change_cbdata)(struct obd_export *exp, - struct lov_stripe_md *lsm, ldlm_iterator_t it, - void *data); + int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type, + ldlm_policy_data_t *, __u32 mode, int *flags, void *data, + struct lustre_handle *lockh); + int (*o_change_cbdata)(struct obd_export *, struct lov_stripe_md *, + ldlm_iterator_t it, void *data); int (*o_cancel)(struct obd_export *, struct lov_stripe_md *md, __u32 mode, struct lustre_handle *); int (*o_cancel_unused)(struct obd_export *, struct lov_stripe_md *, @@ -632,11 +635,6 @@ struct obd_ops { int count, struct llog_logid *logid); int (*o_llog_finish)(struct obd_device *obd, int count); - /* only until proper file size mechanics arrive */ - int (*o_lock_contains)(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct ldlm_lock *lock, obd_off offset); - /* metadata-only methods */ int (*o_pin)(struct obd_export *, obd_id ino, __u32 gen, int type, struct obd_client_handle *, int flag); @@ -651,7 +649,6 @@ struct obd_ops { * to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c. * Also, add a wrapper function in include/linux/obd_class.h. */ - }; diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 3e1a512..71790e8 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -852,12 +852,11 @@ static inline int obd_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(rc); } -static inline int obd_enqueue(struct obd_export *exp, - struct lov_stripe_md *ea, - struct lustre_handle *parent_lock, - __u32 type, void *cookie, int cookielen, - __u32 mode, int *flags, void *cb, void *data, - struct lustre_handle *lockh) +static inline int obd_enqueue(struct obd_export *exp, struct lov_stripe_md *ea, + __u32 type, ldlm_policy_data_t *policy, + __u32 mode, int *flags, void *bl_cb, void *cp_cb, + void *gl_cb, void *data, __u32 lvb_len, + void *lvb_swabber, struct lustre_handle *lockh) { int rc; ENTRY; @@ -865,16 +864,15 @@ static inline int obd_enqueue(struct obd_export *exp, EXP_CHECK_OP(exp, enqueue); OBD_COUNTER_INCREMENT(exp->exp_obd, enqueue); - rc = OBP(exp->exp_obd, enqueue)(exp, ea, parent_lock, type, - cookie, cookielen, mode, flags, cb, - data, lockh); + rc = OBP(exp->exp_obd, enqueue)(exp, ea, type, policy, mode, flags, + bl_cb, cp_cb, gl_cb, data, lvb_len, + lvb_swabber, lockh); RETURN(rc); } -static inline int obd_match(struct obd_export *exp, - struct lov_stripe_md *ea, __u32 type, void *cookie, - int cookielen, __u32 mode, int *flags, void *data, - struct lustre_handle *lockh) +static inline int obd_match(struct obd_export *exp, struct lov_stripe_md *ea, + __u32 type, ldlm_policy_data_t *policy, __u32 mode, + int *flags, void *data, struct lustre_handle *lockh) { int rc; ENTRY; @@ -882,12 +880,11 @@ static inline int obd_match(struct obd_export *exp, EXP_CHECK_OP(exp, match); OBD_COUNTER_INCREMENT(exp->exp_obd, match); - rc = OBP(exp->exp_obd, match)(exp, ea, type, cookie, cookielen, mode, - flags, data, lockh); + rc = OBP(exp->exp_obd, match)(exp, ea, type, policy, mode, flags, data, + lockh); RETURN(rc); } - static inline int obd_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, ldlm_iterator_t it, void *data) @@ -971,20 +968,6 @@ static inline int obd_unpin(struct obd_export *exp, return(rc); } -static inline int obd_lock_contains(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct ldlm_lock *lock, obd_off offset) -{ - int rc; - ENTRY; - - EXP_CHECK_OP(exp, lock_contains); - OBD_COUNTER_INCREMENT(exp->exp_obd, lock_contains); - - rc = OBP(exp->exp_obd, lock_contains)(exp, lsm, lock, offset); - RETURN(rc); -} - static inline void obd_invalidate_import(struct obd_device *obd, struct obd_import *imp) { diff --git a/lustre/include/linux/obd_echo.h b/lustre/include/linux/obd_echo.h index 68c0d6e..5ff5e6c 100644 --- a/lustre/include/linux/obd_echo.h +++ b/lustre/include/linux/obd_echo.h @@ -27,7 +27,7 @@ struct ec_lock { struct ec_object *ecl_object; __u64 ecl_cookie; struct lustre_handle ecl_lock_handle; - struct ldlm_extent ecl_extent; + ldlm_policy_data_t ecl_policy; __u32 ecl_mode; }; diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 9c0e65b..c1a7d13 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -110,6 +110,7 @@ extern unsigned int obd_sync_filter; #define OBD_FAIL_LDLM_CANCEL 0x304 #define OBD_FAIL_LDLM_BL_CALLBACK 0x305 #define OBD_FAIL_LDLM_CP_CALLBACK 0x306 +#define OBD_FAIL_LDLM_GL_CALLBACK 0x307 #define OBD_FAIL_OSC 0x400 #define OBD_FAIL_OSC_BRW_READ_BULK 0x401 diff --git a/lustre/include/linux/rbtree.h b/lustre/include/linux/rbtree.h deleted file mode 100644 index e35ddc7..0000000 --- a/lustre/include/linux/rbtree.h +++ /dev/null @@ -1,132 +0,0 @@ -/* - Red Black Trees - (C) 1999 Andrea Arcangeli - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - linux/include/linux/rbtree.h - - To use rbtrees you'll have to implement your own insert and search cores. - This will avoid us to use callbacks and to drop drammatically performances. - I know it's not the cleaner way, but in C (not in C++) to get - performances and genericity... - - Some example of insert and search follows here. The search is a plain - normal search over an ordered tree. The insert instead must be implemented - int two steps: as first thing the code must insert the element in - order as a red leaf in the tree, then the support library function - rb_insert_color() must be called. Such function will do the - not trivial work to rebalance the rbtree if necessary. - ------------------------------------------------------------------------ -static inline struct page * rb_search_page_cache(struct inode * inode, - unsigned long offset) -{ - rb_node_t * n = inode->i_rb_page_cache.rb_node; - struct page * page; - - while (n) - { - page = rb_entry(n, struct page, rb_page_cache); - - if (offset < page->offset) - n = n->rb_left; - else if (offset > page->offset) - n = n->rb_right; - else - return page; - } - return NULL; -} - -static inline struct page * __rb_insert_page_cache(struct inode * inode, - unsigned long offset, - rb_node_t * node) -{ - rb_node_t ** p = &inode->i_rb_page_cache.rb_node; - rb_node_t * parent = NULL; - struct page * page; - - while (*p) - { - parent = *p; - page = rb_entry(parent, struct page, rb_page_cache); - - if (offset < page->offset) - p = &(*p)->rb_left; - else if (offset > page->offset) - p = &(*p)->rb_right; - else - return page; - } - - rb_link_node(node, parent, p); - - return NULL; -} - -static inline struct page * rb_insert_page_cache(struct inode * inode, - unsigned long offset, - rb_node_t * node) -{ - struct page * ret; - if ((ret = __rb_insert_page_cache(inode, offset, node))) - goto out; - rb_insert_color(node, &inode->i_rb_page_cache); - out: - return ret; -} ------------------------------------------------------------------------ -*/ - -#ifndef _LINUX_RBTREE_H -#define _LINUX_RBTREE_H - -typedef struct rb_node_s -{ - struct rb_node_s * rb_parent; - int rb_color; -#define RB_RED 0 -#define RB_BLACK 1 - struct rb_node_s * rb_right; - struct rb_node_s * rb_left; -} -rb_node_t; - -typedef struct rb_root_s -{ - struct rb_node_s * rb_node; -} -rb_root_t; - -#define RB_ROOT (rb_root_t) { NULL, } -#define rb_entry(ptr, type, member) \ - ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) - -extern void rb_insert_color(rb_node_t *, rb_root_t *); -extern void rb_erase(rb_node_t *, rb_root_t *); -extern rb_node_t *rb_get_first(rb_root_t *root); -extern rb_node_t *rb_get_next(rb_node_t *n); - -static inline void rb_link_node(rb_node_t * node, rb_node_t * parent, rb_node_t ** rb_link) -{ - node->rb_parent = parent; - node->rb_color = RB_RED; - node->rb_left = node->rb_right = NULL; - - *rb_link = node; -} - -#endif /* _LINUX_RBTREE_H */ diff --git a/lustre/kernel_patches/patches/2.6.0-test6-mm4.patch b/lustre/kernel_patches/patches/2.6.0-test6-mm4.patch index ff8d63b..a32f010 100644 --- a/lustre/kernel_patches/patches/2.6.0-test6-mm4.patch +++ b/lustre/kernel_patches/patches/2.6.0-test6-mm4.patch @@ -14430,7 +14430,7 @@ +++ 25/arch/parisc/lib/checksum.c 2003-10-05 00:33:23.000000000 -0700 @@ -16,8 +16,10 @@ * - * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ + * $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ */ -#include +#include @@ -31511,8 +31511,8 @@ --- linux-2.6.0-test6/drivers/char/ftape/compressor/zftape-compress.c 2003-06-14 12:18:32.000000000 -0700 +++ 25/drivers/char/ftape/compressor/zftape-compress.c 2003-10-05 00:33:24.000000000 -0700 @@ -31,6 +31,7 @@ - char zftc_rev[] = "$Revision: 1.4 $"; - char zftc_dat[] = "$Date: 2004/02/14 03:14:33 $"; + char zftc_rev[] = "$Revision: 1.5 $"; + char zftc_dat[] = "$Date: 2004/02/23 23:37:02 $"; +#include #include @@ -37169,8 +37169,8 @@ --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divamnt.c 2003-09-27 18:57:44.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/divamnt.c 2003-10-05 00:33:24.000000000 -0700 @@ -1,4 +1,4 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ * * Driver for Eicon DIVA Server ISDN cards. * Maint module @@ -37181,16 +37181,16 @@ -#include "di_defs.h" #include "debug_if.h" --static char *main_revision = "$Revision: 1.4 $"; -+static char *main_revision = "$Revision: 1.4 $"; +-static char *main_revision = "$Revision: 1.5 $"; ++static char *main_revision = "$Revision: 1.5 $"; static int major; --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divasmain.c 2003-09-27 18:57:44.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/divasmain.c 2003-10-05 00:33:24.000000000 -0700 @@ -1,4 +1,4 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ * * Low level driver for Eicon DIVA Server ISDN cards. * @@ -37212,16 +37212,16 @@ #include "diva_dma.h" #include "diva_pci.h" --static char *main_revision = "$Revision: 1.4 $"; -+static char *main_revision = "$Revision: 1.4 $"; +-static char *main_revision = "$Revision: 1.5 $"; ++static char *main_revision = "$Revision: 1.5 $"; static int major; --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/dqueue.c 2003-06-14 12:18:22.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/dqueue.c 2003-10-05 00:33:24.000000000 -0700 @@ -1,10 +1,10 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ * * Driver for Eicon DIVA Server ISDN cards. * User Mode IDI Interface @@ -37236,8 +37236,8 @@ --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/mntfunc.c 2003-09-27 18:57:44.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/mntfunc.c 2003-10-05 00:33:24.000000000 -0700 @@ -1,4 +1,4 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ * * Driver for Eicon DIVA Server ISDN cards. * Maint module @@ -37252,8 +37252,8 @@ --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/os_capi.h 2003-06-14 12:18:25.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/os_capi.h 2003-10-05 00:33:24.000000000 -0700 @@ -1,10 +1,10 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ * * ISDN interface module for Eicon active cards DIVA. * CAPI Interface OS include files @@ -37268,8 +37268,8 @@ --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/platform.h 2003-09-27 18:57:44.000000000 -0700 +++ 25/drivers/isdn/hardware/eicon/platform.h 2003-10-05 00:33:24.000000000 -0700 @@ -1,4 +1,4 @@ --/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ -+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ +-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ ++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ * * platform.h * @@ -37754,7 +37754,7 @@ +++ 25/drivers/media/video/planb.c 2003-10-05 00:33:24.000000000 -0700 @@ -27,7 +27,6 @@ - /* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ */ + /* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ */ -#include #include @@ -38069,7 +38069,7 @@ --- linux-2.6.0-test6/drivers/mtd/chips/map_rom.c 2003-06-14 12:18:24.000000000 -0700 +++ 25/drivers/mtd/chips/map_rom.c 2003-10-05 00:33:24.000000000 -0700 @@ -4,7 +4,6 @@ - * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ + * $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ */ -#include @@ -42159,8 +42159,8 @@ #include /* Version */ --static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n"; -+static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n"; +-static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ for Linux\n"; ++static const char version[] = "Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ for Linux\n"; static int debug; static int quartz; diff --git a/lustre/kernel_patches/patches/bproc-patch-2.4.20 b/lustre/kernel_patches/patches/bproc-patch-2.4.20 index 54d1f68..90d86c2 100644 --- a/lustre/kernel_patches/patches/bproc-patch-2.4.20 +++ b/lustre/kernel_patches/patches/bproc-patch-2.4.20 @@ -1,5 +1,3 @@ -$Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $ - Index: linux/fs/exec.c =================================================================== --- linux.orig/fs/exec.c 2003-09-03 17:52:00.000000000 -0400 @@ -764,7 +762,7 @@ Index: linux/kernel/bproc_hook.c + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * -+ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $ ++ * Id: bproc-patch-2.4.20,v 1.3.2.1 2004/02/14 07:21:44 nic Exp $ + *-----------------------------------------------------------------------*/ +#include +#include @@ -832,7 +830,7 @@ Index: linux/include/linux/bproc.h + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * -+ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $ ++ * Id: bproc-patch-2.4.20,v 1.3.2.1 2004/02/14 07:21:44 nic Exp $ + *-----------------------------------------------------------------------*/ +#ifndef _LINUX_BPROC_H +#define _LINUX_BPROC_H diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index 2a4f832..d1f8c56 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -121,7 +121,7 @@ void l_check_no_ns_lock(struct ldlm_namespace *ns) static unsigned long next_msg; if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) { - CERROR("namespace %s lock held during RPCs; tell phil\n", + CERROR("namespace %s lock held illegally; tell phil\n", ns->ns_name); #if (LUSTRE_KERNEL_VERSION >= 30) CERROR(portals_debug_dumpstack()); diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 9c29dbc..32fb89d 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -175,7 +175,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex); if (new_ex.start != lock->l_policy_data.l_extent.start || - new_ex.end != lock->l_policy_data.l_extent.end) { + new_ex.end != lock->l_policy_data.l_extent.end) { *flags |= LDLM_FL_LOCK_CHANGED; lock->l_policy_data.l_extent.start = new_ex.start; lock->l_policy_data.l_extent.end = new_ex.end; @@ -209,3 +209,32 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, } RETURN(0); } + +/* When a lock is cancelled by a client, the KMS may undergo change if this + * is the "highest lock". This function returns the new KMS value. + * + * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */ +__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) +{ + struct ldlm_resource *res = lock->l_resource; + struct list_head *tmp; + struct ldlm_lock *lck; + __u64 kms = 0; + ENTRY; + + l_lock(&res->lr_namespace->ns_lock); + list_for_each(tmp, &res->lr_granted) { + lck = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (lock == lck) + continue; + if (lck->l_policy_data.l_extent.end >= old_kms) + GOTO(out, kms = old_kms); + kms = lck->l_policy_data.l_extent.end + 1; + } + + GOTO(out, kms); + out: + l_unlock(&res->lr_namespace->ns_lock); + return kms; +} diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 57c09c5..181c72e 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -312,7 +312,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, * release the ns_lock, allocate the new lock, * and restart processing this lock. */ new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK, - lock->l_granted_mode, NULL, NULL, NULL); + lock->l_granted_mode, NULL, NULL, NULL, + NULL, 0); if (!new2) { ldlm_flock_destroy(req, lock->l_granted_mode, *flags); *err = -ENOLCK; diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index abd0f2e..4186f5c 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -8,9 +8,10 @@ struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, struct ldlm_res_id, __u32 type, ldlm_mode_t, ldlm_blocking_callback, - ldlm_completion_callback, void *data); + ldlm_completion_callback, ldlm_glimpse_callback, void *data, + __u32 lvb_len); ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, - void *cookie, int cookie_len, int *flags); + void *cookie, int *flags); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode); void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, @@ -18,9 +19,6 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue); int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list); -typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err); - /* ldlm_plain.c */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 5fde33e..bb0c0c1 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -89,16 +89,14 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = { #endif }; -static ldlm_res_policy ldlm_intent_policy_func; - -void ldlm_register_intent(ldlm_res_policy arg) +ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res) { - ldlm_intent_policy_func = arg; + return ldlm_processing_policy_table[res->lr_type]; } -void ldlm_unregister_intent(void) +void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg) { - ldlm_intent_policy_func = NULL; + ns->ns_policy = arg; } /* @@ -142,6 +140,9 @@ void ldlm_lock_put(struct ldlm_lock *lock) if (lock->l_parent) LDLM_LOCK_PUT(lock->l_parent); + if (lock->l_lvb_data != NULL) + OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); + OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); l_unlock(&ns->ns_lock); } @@ -375,7 +376,6 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) desc->l_granted_mode = lock->l_granted_mode; memcpy(&desc->l_policy_data, &lock->l_policy_data, sizeof(desc->l_policy_data)); - memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); } void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, @@ -434,6 +434,7 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_readers++; else lock->l_writers++; + lock->l_last_used = jiffies; l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); @@ -467,7 +468,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) (lock->l_flags & LDLM_FL_CBPENDING)) { /* If we received a blocked AST and this was the last reference, * run the callback. */ - if (!ns->ns_client && lock->l_export) + if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export) CERROR("FL_CBPENDING set on non-local lock--just a " "warning\n"); @@ -479,7 +480,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) if (lock->l_blocking_ast != NULL) lock->l_blocking_ast(lock, NULL, lock->l_ast_data, LDLM_CB_BLOCKING); - } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) { + } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && + !lock->l_readers && !lock->l_writers) { /* If this is a client-side namespace and this was the last * reference, put it on the LRU. */ LASSERT(list_empty(&lock->l_lru)); @@ -550,7 +552,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, /* returns a referenced lock or NULL. See the flag descriptions below, in the * comment above ldlm_lock_match */ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, - struct ldlm_extent *extent, + ldlm_policy_data_t *policy, struct ldlm_lock *old_lock, int flags) { struct ldlm_lock *lock; @@ -579,8 +581,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, continue; if (lock->l_resource->lr_type == LDLM_EXTENT && - (lock->l_policy_data.l_extent.start > extent->start || - lock->l_policy_data.l_extent.end < extent->end)) + (lock->l_policy_data.l_extent.start > + policy->l_extent.start || + lock->l_policy_data.l_extent.end < policy->l_extent.end)) continue; if (lock->l_destroyed) @@ -597,6 +600,14 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, return NULL; } +void ldlm_lock_allow_match(struct ldlm_lock *lock) +{ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_flags |= LDLM_FL_CAN_MATCH; + wake_up(&lock->l_waitq); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); +} + /* Can be called in two ways: * * If 'ns' is NULL, then lockh describes an existing lock that we want to look @@ -616,8 +627,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, * case, lockh is filled in with a addref()ed lock */ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, - struct ldlm_res_id *res_id, __u32 type, void *cookie, - int cookielen, ldlm_mode_t mode, + struct ldlm_res_id *res_id, __u32 type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, struct lustre_handle *lockh) { struct ldlm_resource *res; @@ -643,15 +654,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, l_lock(&ns->ns_lock); - lock = search_queue(&res->lr_granted, mode, cookie, old_lock, flags); + lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); if (flags & LDLM_FL_BLOCK_GRANTED) GOTO(out, rc = 0); - lock = search_queue(&res->lr_converting, mode, cookie, old_lock, flags); + lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); - lock = search_queue(&res->lr_waiting, mode, cookie, old_lock, flags); + lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); @@ -661,10 +672,17 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, l_unlock(&ns->ns_lock); if (lock) { + struct l_wait_info lwi; ldlm_lock2handle(lock, lockh); if (lock->l_completion_ast) lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL); + + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL); + + /* XXX FIXME see comment about CAN_MATCH in lustre_dlm.h */ + l_wait_event(lock->l_waitq, + (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi); } if (rc) LDLM_DEBUG(lock, "matched"); @@ -684,7 +702,8 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, ldlm_mode_t mode, ldlm_blocking_callback blocking, ldlm_completion_callback completion, - void *data) + ldlm_glimpse_callback glimpse, + void *data, __u32 lvb_len) { struct ldlm_resource *res, *parent_res = NULL; struct ldlm_lock *lock, *parent_lock = NULL; @@ -712,13 +731,21 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_ast_data = data; lock->l_blocking_ast = blocking; lock->l_completion_ast = completion; + lock->l_glimpse_ast = glimpse; + + lock->l_lvb_len = lvb_len; + OBD_ALLOC(lock->l_lvb_data, lvb_len); + if (lock->l_lvb_data == NULL) { + OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); + RETURN(NULL); + } RETURN(lock); } ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, struct ldlm_lock **lockp, - void *cookie, int cookie_len, int *flags) + void *cookie, int *flags) { struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; @@ -727,20 +754,20 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, ldlm_error_t rc = ELDLM_OK; ENTRY; - if (res->lr_type != LDLM_PLAIN) - memcpy(&lock->l_policy_data, cookie, cookie_len); - /* policies are not executed on the client or during replay */ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT - && !local && ldlm_intent_policy_func) { - rc = ldlm_intent_policy_func(ns, lockp, cookie, - lock->l_req_mode, *flags, NULL); + && !local && ns->ns_policy) { + rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags, + NULL); if (rc == ELDLM_LOCK_REPLACED) { /* The lock that was returned has already been granted, - * and placed into lockp. Destroy the old one and our + * and placed into lockp. If it's not the same as the + * one we passed in, then destroy the old one and our * work here is done. */ - ldlm_lock_destroy(lock); - LDLM_LOCK_PUT(lock); + if (lock != *lockp) { + ldlm_lock_destroy(lock); + LDLM_LOCK_PUT(lock); + } *flags |= LDLM_FL_LOCK_CHANGED; RETURN(0); } else if (rc == ELDLM_LOCK_ABORTED || @@ -1086,19 +1113,14 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) if (!((portal_debug | D_ERROR) & level)) return; - if (RES_VERSION_SIZE != 4) - LBUG(); - if (!lock) { CDEBUG(level, " NULL LDLM lock\n"); return; } - CDEBUG(level, - " -- Lock dump: %p/"LPX64" (%x %x %x %x) (rc: %d) (pos: %d)\n", - lock, lock->l_handle.h_cookie, lock->l_version[0], - lock->l_version[1], lock->l_version[2], lock->l_version[3], - atomic_read(&lock->l_refc), pos); + CDEBUG(level, " -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d)\n", + lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), + pos); if (lock->l_conn_export != NULL) obd = lock->l_conn_export->exp_obd; if (lock->l_export && lock->l_export->exp_connection) { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 2d7946b..a9020d3 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -388,7 +388,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_send_state = LUSTRE_IMP_FULL; req->rq_timeout = 2; /* 2 second timeout for initial AST reply */ rc = ptlrpc_queue_wait(req); - if (rc == -ETIMEDOUT || rc == -EINTR) { + if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) { LASSERT(lock->l_export); if (lock->l_export->exp_libclient) { CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid " @@ -445,13 +445,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) struct ptlrpc_request *req; struct timeval granted_time; long total_enqueue_wait; - int rc = 0, size = sizeof(*body); + int rc = 0, size[2] = {sizeof(*body)}, buffers = 1; ENTRY; - if (lock == NULL) { - LBUG(); - RETURN(-EINVAL); - } + LASSERT(lock != NULL); do_gettimeofday(&granted_time); total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time); @@ -459,9 +456,14 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (total_enqueue_wait / 1000000 > obd_timeout) LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait); + if (lock->l_resource->lr_lvb_len) { + buffers = 2; + size[1] = lock->l_resource->lr_lvb_len; + } + req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, - LDLM_CP_CALLBACK, 1, &size, NULL); - if (!req) + LDLM_CP_CALLBACK, buffers, size, NULL); + if (req == NULL) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); @@ -470,6 +472,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) body->lock_flags = flags; ldlm_lock2desc(lock, &body->lock_desc); + if (buffers == 2) { + void *lvb = lustre_msg_buf(req->rq_reqmsg, 1, + lock->l_resource->lr_lvb_len); + memcpy(lvb, lock->l_resource->lr_lvb_data, + lock->l_resource->lr_lvb_len); + } + LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)", total_enqueue_wait); req->rq_replen = lustre_msg_size(0, NULL); @@ -486,7 +495,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) l_unlock(&lock->l_resource->lr_namespace->ns_lock); rc = ptlrpc_queue_wait(req); - if (rc == -ETIMEDOUT || rc == -EINTR) { + if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) { ldlm_del_waiting_lock(lock); ldlm_failed_ast(lock, rc, "completion"); } else if (rc) { @@ -502,14 +511,57 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(rc); } +int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) +{ + struct ldlm_resource *res = lock->l_resource; + struct ldlm_request *body; + struct ptlrpc_request *req; + int rc = 0, size = sizeof(*body); + ENTRY; + + LASSERT(lock != NULL); + + req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, + LDLM_GL_CALLBACK, 1, &size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body)); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + ldlm_lock2desc(lock, &body->lock_desc); + + size = lock->l_resource->lr_lvb_len; + req->rq_replen = lustre_msg_size(1, &size); + + req->rq_send_state = LUSTRE_IMP_FULL; + req->rq_timeout = 2; /* 2 second timeout for initial AST reply */ + + rc = ptlrpc_queue_wait(req); + if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) { + ldlm_del_waiting_lock(lock); + ldlm_failed_ast(lock, rc, "glimpse"); + } else if (rc) { + LDLM_ERROR(lock, "client sent rc %d rq_status %d from " + "completion AST\n", rc, req->rq_status); + ldlm_lock_cancel(lock); + } else { + rc = res->lr_namespace->ns_lvbo->lvbo_update(res, + req->rq_repmsg, 0); + } + ptlrpc_req_finished(req); + RETURN(rc); +} + int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback completion_callback, - ldlm_blocking_callback blocking_callback) + ldlm_blocking_callback blocking_callback, + ldlm_glimpse_callback glimpse_callback) { struct obd_device *obddev = req->rq_export->exp_obd; struct ldlm_reply *dlm_rep; struct ldlm_request *dlm_req; - int rc, size = sizeof(*dlm_rep), cookielen = 0; + int rc, size[2] = {sizeof(*dlm_rep)}; __u32 flags; ldlm_error_t err; struct ldlm_lock *lock = NULL; @@ -526,31 +578,14 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } flags = dlm_req->lock_flags; - if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN && - (flags & LDLM_FL_HAS_INTENT)) { - /* In this case, the reply buffer is allocated deep in - * local_lock_enqueue by the policy function. */ - cookie = req; - cookielen = sizeof(*req); - } else { - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - CERROR("out of memory\n"); - RETURN(-ENOMEM); - } - if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) { - cookie = &dlm_req->lock_desc.l_policy_data; - cookielen = sizeof(ldlm_policy_data_t); - } - } /* The lock's callback data might be set in the policy function */ - lock = ldlm_lock_create(obddev->obd_namespace, - &dlm_req->lock_handle2, + lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2, dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, - blocking_callback, completion_callback, NULL); + blocking_callback, completion_callback, + glimpse_callback, NULL, 0); if (!lock) GOTO(out, err = -ENOMEM); @@ -566,24 +601,35 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, &lock->l_export->exp_ldlm_data.led_held_locks); l_unlock(&lock->l_resource->lr_namespace->ns_lock); - err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen, - &flags); + if (flags & LDLM_FL_HAS_INTENT) { + /* In this case, the reply buffer is allocated deep in + * local_lock_enqueue by the policy function. */ + cookie = req; + } else { + int buffers = 1; + if (lock->l_resource->lr_lvb_len) { + size[1] = lock->l_resource->lr_lvb_len; + buffers = 2; + } + + rc = lustre_pack_reply(req, buffers, size, NULL); + if (rc) + RETURN(rc); + } + + if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) + memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data, + sizeof(ldlm_policy_data_t)); + + err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags); if (err) GOTO(out, err); dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep)); dlm_rep->lock_flags = flags; + ldlm_lock2desc(lock, &dlm_rep->lock_desc); ldlm_lock2handle(lock, &dlm_rep->lock_handle); - if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) { - memcpy(&dlm_rep->lock_policy_data, &lock->l_policy_data, - cookielen); - } - if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) { - memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name, - sizeof(dlm_rep->lock_resource_name)); - dlm_rep->lock_mode = lock->l_req_mode; - } /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ @@ -597,6 +643,12 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, EXIT; out: + if (lock->l_resource->lr_lvb_len > 0) { + void *lvb = lustre_msg_buf(req->rq_repmsg, 1, + lock->l_resource->lr_lvb_len); + memcpy(lvb, lock->l_resource->lr_lvb_data, + lock->l_resource->lr_lvb_len); + } req->rq_status = err; /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this @@ -662,11 +714,12 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_lock *lock; + struct ldlm_resource *res; char str[PTL_NALFMT_SIZE]; int rc; ENTRY; - dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req), + dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req), lustre_swab_ldlm_request); if (dlm_req == NULL) { CERROR("bad request buffer for cancel\n"); @@ -694,10 +747,18 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) req->rq_status = ESTALE; } else { LDLM_DEBUG(lock, "server-side cancel handler START"); + res = lock->l_resource; + if (res && res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + (void)res->lr_namespace->ns_lvbo->lvbo_update + (res, NULL, 0); + //(res, req->rq_reqmsg, 1); + } + ldlm_lock_cancel(lock); if (ldlm_del_waiting_lock(lock)) CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); - req->rq_status = 0; + req->rq_status = rc; } if (ptlrpc_reply(req) != 0) @@ -781,6 +842,18 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "completion AST includes blocking AST"); } + if (lock->l_lvb_len) { + void *lvb; + lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len, + lock->l_lvb_swabber); + if (lvb == NULL) { + LDLM_ERROR(lock, "completion AST did not contain " + "expected LVB!"); + } else { + memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len); + } + } + lock->l_resource->lr_tmp = &ast_list; ldlm_grant_lock(lock, req, sizeof(*req), 1); lock->l_resource->lr_tmp = NULL; @@ -795,6 +868,37 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, EXIT; } +static void ldlm_handle_gl_callback(struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct ldlm_request *dlm_req, + struct ldlm_lock *lock) +{ + ENTRY; + + l_lock(&ns->ns_lock); + LDLM_DEBUG(lock, "client glimpse AST callback handler"); + + if (lock->l_glimpse_ast != NULL) { + l_unlock(&ns->ns_lock); + l_check_no_ns_lock(ns); + lock->l_glimpse_ast(lock, req); + l_lock(&ns->ns_lock); + } + + if (lock->l_granted_mode == LCK_PW && + !lock->l_readers && !lock->l_writers && + time_after(jiffies, lock->l_last_used + 10 * HZ)) { + l_unlock(&ns->ns_lock); + ldlm_handle_bl_callback(ns, NULL, lock); + EXIT; + return; + } + + l_unlock(&ns->ns_lock); + LDLM_LOCK_PUT(lock); + EXIT; +} + static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) { req->rq_status = rc; @@ -869,6 +973,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) { OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0); + } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) { + OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0); } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) { OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0); } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) { @@ -947,7 +1053,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) * cancelling right now, because it's unused, or have an intent result * in the reply, so we might have to push the responsibility for sending * the reply down into the AST handlers, alas. */ - if (req->rq_reqmsg->opc != LDLM_BL_CALLBACK) + if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) ldlm_callback_reply(req, 0); switch (req->rq_reqmsg->opc) { @@ -966,6 +1072,10 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "completion ast\n"); ldlm_handle_cp_callback(req, ns, dlm_req, lock); break; + case LDLM_GL_CALLBACK: + CDEBUG(D_INODE, "glimpse ast\n"); + ldlm_handle_gl_callback(req, ns, dlm_req, lock); + break; default: LBUG(); /* checked above */ } @@ -1322,14 +1432,18 @@ void __exit ldlm_exit(void) /* ldlm_flock.c */ EXPORT_SYMBOL(ldlm_flock_completion_ast); +/* ldlm_extent.c */ +EXPORT_SYMBOL(ldlm_extent_shift_kms); + /* ldlm_lock.c */ +EXPORT_SYMBOL(ldlm_get_processing_policy); EXPORT_SYMBOL(ldlm_lock2desc); EXPORT_SYMBOL(ldlm_register_intent); -EXPORT_SYMBOL(ldlm_unregister_intent); EXPORT_SYMBOL(ldlm_lockname); EXPORT_SYMBOL(ldlm_typename); EXPORT_SYMBOL(ldlm_lock2handle); EXPORT_SYMBOL(__ldlm_handle2lock); +EXPORT_SYMBOL(ldlm_lock_get); EXPORT_SYMBOL(ldlm_lock_put); EXPORT_SYMBOL(ldlm_lock_match); EXPORT_SYMBOL(ldlm_lock_cancel); @@ -1343,6 +1457,7 @@ EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_lock_dump_handle); EXPORT_SYMBOL(ldlm_cancel_locks_for_export); EXPORT_SYMBOL(ldlm_reprocess_all_ns); +EXPORT_SYMBOL(ldlm_lock_allow_match); /* ldlm_request.c */ EXPORT_SYMBOL(ldlm_completion_ast); @@ -1360,6 +1475,7 @@ EXPORT_SYMBOL(ldlm_change_cbdata); /* ldlm_lockd.c */ EXPORT_SYMBOL(ldlm_server_blocking_ast); EXPORT_SYMBOL(ldlm_server_completion_ast); +EXPORT_SYMBOL(ldlm_server_glimpse_ast); EXPORT_SYMBOL(ldlm_handle_enqueue); EXPORT_SYMBOL(ldlm_handle_cancel); EXPORT_SYMBOL(ldlm_handle_convert); @@ -1378,6 +1494,8 @@ EXPORT_SYMBOL(ldlm_regression_stop); EXPORT_SYMBOL(ldlm_namespace_new); EXPORT_SYMBOL(ldlm_namespace_cleanup); EXPORT_SYMBOL(ldlm_namespace_free); +EXPORT_SYMBOL(ldlm_resource_get); +EXPORT_SYMBOL(ldlm_resource_putref); /* l_lock.c */ EXPORT_SYMBOL(l_lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 9c03aeb..f6045f8 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -73,27 +73,24 @@ int ldlm_expired_completion_wait(void *data) int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data) { - /* XXX ALLOCATE - 160 mytes */ + /* XXX ALLOCATE - 160 bytes */ struct lock_wait_data lwd; unsigned long irqflags; struct obd_device *obd; struct obd_import *imp = NULL; - int rc = 0; struct l_wait_info lwi; + int rc = 0; ENTRY; if (flags == LDLM_FL_WAIT_NOREPROC) goto noreproc; - if (flags == 0) { + if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV))) { wake_up(&lock->l_waitq); RETURN(0); } - if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV))) - RETURN(0); - LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " "sleeping"); ldlm_lock_dump(D_OTHER, lock, 0); @@ -138,15 +135,16 @@ noreproc: } static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, - struct lustre_handle *parent_lockh, struct ldlm_res_id res_id, __u32 type, - void *cookie, int cookielen, + ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags, - ldlm_completion_callback completion, ldlm_blocking_callback blocking, - void *data, + ldlm_completion_callback completion, + ldlm_glimpse_callback glimpse, + void *data, __u32 lvb_len, + void *lvb_swabber, struct lustre_handle *lockh) { struct ldlm_lock *lock; @@ -158,8 +156,8 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, LBUG(); } - lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, - blocking, completion, data); + lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + completion, glimpse, data, lvb_len); if (!lock) GOTO(out_nolock, err = -ENOMEM); LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); @@ -167,13 +165,16 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); lock->l_flags |= LDLM_FL_LOCAL; + lock->l_lvb_swabber = lvb_swabber; + if (policy != NULL) + memcpy(&lock->l_policy_data, policy, sizeof(*policy)); - err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags); + err = ldlm_lock_enqueue(ns, &lock, policy, flags); if (err != ELDLM_OK) GOTO(out, err); - if (type != LDLM_PLAIN) - memcpy(cookie, &lock->l_policy_data, cookielen); + if (policy != NULL) + memcpy(policy, &lock->l_policy_data, sizeof(*policy)); if ((*flags) & LDLM_FL_LOCK_CHANGED) memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id)); @@ -207,30 +208,32 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_namespace *ns, - struct lustre_handle *parent_lock_handle, struct ldlm_res_id res_id, __u32 type, - void *cookie, int cookielen, + ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags, - ldlm_completion_callback completion, ldlm_blocking_callback blocking, + ldlm_completion_callback completion, + ldlm_glimpse_callback glimpse, void *data, + void *lvb, + __u32 lvb_len, + void *lvb_swabber, struct lustre_handle *lockh) { struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; - int rc, size = sizeof(*body), req_passed_in = 1, is_replay; + int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1; + int is_replay = *flags & LDLM_FL_REPLAY; ENTRY; - is_replay = *flags & LDLM_FL_REPLAY; - LASSERT(exp != NULL || !is_replay); - if (exp == NULL) { - rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, - type, cookie, cookielen, mode, - flags, completion, blocking, data, + LASSERT(!is_replay); + rc = ldlm_cli_enqueue_local(ns, res_id, type, policy, mode, + flags, blocking, completion, + glimpse, data, lvb_len, lvb_swabber, lockh); RETURN(rc); } @@ -242,23 +245,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { - lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, - mode, blocking, completion, data); + lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + completion, glimpse, data, lvb_len); if (lock == NULL) GOTO(out_nolock, rc = -ENOMEM); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); - if (type != LDLM_PLAIN) - memcpy(&lock->l_policy_data, cookie, cookielen); + lock->l_lvb_swabber = lvb_swabber; + if (policy != NULL) + memcpy(&lock->l_policy_data, policy, sizeof(*policy)); LDLM_DEBUG(lock, "client-side enqueue START"); } if (req == NULL) { req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1, - &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + size, NULL); + if (req == NULL) + GOTO(out_lock, rc = -ENOMEM); req_passed_in = 0; } else if (req->rq_reqmsg->buflens[0] != sizeof(*body)) LBUG(); @@ -269,14 +273,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, body->lock_flags = *flags; memcpy(&body->lock_handle1, lockh, sizeof(*lockh)); - if (parent_lock_handle) - memcpy(&body->lock_handle2, parent_lock_handle, - sizeof(body->lock_handle2)); /* Continue as normal. */ if (!req_passed_in) { - size = sizeof(*reply); - req->rq_replen = lustre_msg_size(1, &size); + int buffers = 1; + if (lvb_len > 0) + buffers = 2; + size[0] = sizeof(*reply); + req->rq_replen = lustre_msg_size(buffers, size); } lock->l_conn_export = exp; lock->l_export = NULL; @@ -289,24 +293,32 @@ int ldlm_cli_enqueue(struct obd_export *exp, LASSERT(!is_replay); LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); - failed_lock_cleanup(ns, lock, lockh, mode); if (rc == ELDLM_LOCK_ABORTED) { /* Before we return, swab the reply */ reply = lustre_swab_repbuf(req, 0, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); - GOTO(out_req, rc = -EPROTO); + rc = -EPROTO; + } + if (lvb_len) { + void *tmplvb; + tmplvb = lustre_swab_repbuf(req, 1, lvb_len, + lvb_swabber); + if (tmplvb == NULL) + GOTO(out_lock, rc = -EPROTO); + if (lvb != NULL) + memcpy(lvb, tmplvb, lvb_len); } } - GOTO(out_req, rc); + GOTO(out_lock, rc); } reply = lustre_swab_repbuf(req, 0, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); - GOTO(out_req, rc = -EPROTO); + GOTO(out_lock, rc = -EPROTO); } memcpy(&lock->l_remote_handle, &reply->lock_handle, @@ -320,21 +332,18 @@ int ldlm_cli_enqueue(struct obd_export *exp, "extent "LPU64" -> "LPU64"\n", body->lock_desc.l_policy_data.l_extent.start, body->lock_desc.l_policy_data.l_extent.end, - reply->lock_policy_data.l_extent.start, - reply->lock_policy_data.l_extent.end); - - cookie = &reply->lock_policy_data; /* FIXME bug 267 */ - cookielen = sizeof(struct ldlm_extent); - } else if (type == LDLM_FLOCK) { - cookie = &reply->lock_policy_data; - cookielen = sizeof(struct ldlm_flock); + reply->lock_desc.l_policy_data.l_extent.start, + reply->lock_desc.l_policy_data.l_extent.end); } + if (policy != NULL) + memcpy(&lock->l_policy_data, &reply->lock_desc.l_policy_data, + sizeof(reply->lock_desc.l_policy_data)); /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ if ((*flags) & LDLM_FL_LOCK_CHANGED) { - int newmode = reply->lock_mode; + int newmode = reply->lock_desc.l_req_mode; LASSERT(!is_replay); if (newmode && newmode != lock->l_req_mode) { LDLM_DEBUG(lock, "server returned different mode %s", @@ -342,18 +351,18 @@ int ldlm_cli_enqueue(struct obd_export *exp, lock->l_req_mode = newmode; } - if (reply->lock_resource_name.name[0] != + if (reply->lock_desc.l_resource.lr_name.name[0] != lock->l_resource->lr_name.name[0]) { CDEBUG(D_INFO, "remote intent success, locking %ld " "instead of %ld\n", - (long)reply->lock_resource_name.name[0], + (long)reply->lock_desc.l_resource.lr_name.name[0], (long)lock->l_resource->lr_name.name[0]); ldlm_lock_change_resource(ns, lock, - reply->lock_resource_name); + reply->lock_desc.l_resource.lr_name); if (lock->l_resource == NULL) { LBUG(); - GOTO(out_req, rc = -ENOMEM); + GOTO(out_lock, rc = -ENOMEM); } LDLM_DEBUG(lock, "client-side enqueue, new resource"); } @@ -365,39 +374,41 @@ int ldlm_cli_enqueue(struct obd_export *exp, LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); } + if (lvb_len) { + void *tmplvb; + tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber); + if (tmplvb == NULL) + GOTO(out_lock, rc = -EPROTO); + memcpy(lock->l_lvb_data, tmplvb, lvb_len); + } + if (!is_replay) { - rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags); + rc = ldlm_lock_enqueue(ns, &lock, NULL, flags); if (lock->l_completion_ast != NULL) { int err = lock->l_completion_ast(lock, *flags, NULL); - if (err) - failed_lock_cleanup(ns, lock, lockh, mode); if (!rc) rc = err; } } + if (lvb_len && lvb != NULL) { + /* Copy the LVB here, and not earlier, because the completion + * AST (if any) can override what we got in the reply */ + memcpy(lvb, lock->l_lvb_data, lvb_len); + } + LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; - out_req: - if (!req_passed_in) + out_lock: + if (rc) + failed_lock_cleanup(ns, lock, lockh, mode); + if (!req_passed_in && req != NULL) ptlrpc_req_finished(req); - out: LDLM_LOCK_PUT(lock); out_nolock: return rc; } -int ldlm_cli_replay_enqueue(struct ldlm_lock *lock) -{ - struct lustre_handle lockh; - struct ldlm_res_id junk; - int flags = LDLM_FL_REPLAY; - ldlm_lock2handle(lock, &lockh); - return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk, - lock->l_resource->lr_type, NULL, 0, -1, &flags, - NULL, NULL, NULL, &lockh); -} - static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags) { diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 416b28b..52cebf1 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -432,7 +432,7 @@ static struct ldlm_resource *ldlm_resource_new(void) INIT_LIST_HEAD(&res->lr_granted); INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); - + sema_init(&res->lr_lvb_sem, 1); atomic_set(&res->lr_refcount, 1); return res; @@ -482,6 +482,7 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, } l_unlock(&ns->ns_lock); + RETURN(res); } @@ -494,6 +495,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, { struct list_head *bucket, *tmp; struct ldlm_resource *res = NULL; + int rc; ENTRY; LASSERT(ns != NULL); @@ -520,6 +522,15 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, l_unlock(&ns->ns_lock); + if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + rc = ns->ns_lvbo->lvbo_init(res); + if (rc) { + CERROR("lvbo_init failure %d\n", rc); + LASSERT(ldlm_resource_putref(res) == 1); + res = NULL; + } + } + RETURN(res); } @@ -579,6 +590,8 @@ int ldlm_resource_putref(struct ldlm_resource *res) ns->ns_refcount--; list_del_init(&res->lr_hash); list_del_init(&res->lr_childof); + if (res->lr_lvb_data) + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); l_unlock(&ns->ns_lock); OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); @@ -626,7 +639,6 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) { desc->lr_type = res->lr_type; memcpy(&desc->lr_name, &res->lr_name, sizeof(desc->lr_name)); - memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version)); } void ldlm_dump_all_namespaces(void) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 8b173cc..94dc98d 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "llite_internal.h" @@ -193,7 +194,12 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft) it->it_op_release = ll_intent_release; } +#if (LUSTRE_KERNEL_VERSION < 33) int ll_revalidate_it(struct dentry *de, int flags, struct lookup_intent *it) +#else +int ll_revalidate_it(struct dentry *de, int flags, struct nameidata *nd, + struct lookup_intent *it) +#endif { int rc; struct ll_fid pfid, cfid; diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 35a6553..7733155 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -87,7 +87,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) } rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, 0, LCK_PR, &lockh); + &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); if (!rc) { ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 487c6c2..b98a066 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -39,14 +39,11 @@ static int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode, struct ll_file_data *fd = file->private_data; struct ptlrpc_request *req = NULL; struct obd_client_handle *och = &fd->fd_mds_och; - struct ll_inode_info *lli = ll_i2info(inode); struct obdo obdo; int rc, valid; ENTRY; valid = OBD_MD_FLID; - if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) - valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; memset(&obdo, 0, sizeof(obdo)); obdo.o_id = inode->i_ino; @@ -288,17 +285,45 @@ static inline void ll_remove_suid(struct inode *inode) } } -/* Flush the page cache for an extent as its canceled. No one can dirty the - * extent until we've finished our work and they can enqueue another lock. - * The DLM protects us from ll_file_read/write here, but other kernel actors - * could have pages locked */ +static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct obd_export *exp = ll_i2obdexp(inode); + struct { + char name[16]; + struct ldlm_lock *lock; + struct lov_stripe_md *lsm; + } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm }; + __u32 stripe, vallen = sizeof(stripe); + int rc; + ENTRY; + + if (lsm->lsm_stripe_count == 1) + RETURN(0); + + /* get our offset in the lov */ + rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe); + if (rc != 0) { + CERROR("obd_get_info: rc = %d\n", rc); + LBUG(); + } + LASSERT(stripe < lsm->lsm_stripe_count); + RETURN(stripe); +} + +/* Flush the page cache for an extent as its canceled. When we're on an LOV, + * we get a lock cancellation for each stripe, so we have to map the obd's + * region back onto the stripes in the file that it held. + * + * No one can dirty the extent until we've finished our work and they can + * enqueue another lock. The DLM protects us from ll_file_read/write here, + * but other kernel actors could have pages locked. */ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, - struct ldlm_lock *lock) + struct ldlm_lock *lock, __u32 stripe) { struct ldlm_extent *extent = &lock->l_policy_data.l_extent; - struct obd_export *exp = ll_i2obdexp(inode); - struct ll_inode_info *lli = ll_i2info(inode); - unsigned long start, end, i; + unsigned long start, end, count, skip, i, j; struct page *page; int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA; ENTRY; @@ -306,26 +331,50 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n", inode->i_ino, inode, extent->start, extent->end, inode->i_size); + /* our locks are page granular thanks to osc_enqueue, we invalidate the + * whole page. */ + LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0); + LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0); + start = extent->start >> PAGE_CACHE_SHIFT; + count = ~0; + skip = 0; end = (extent->end >> PAGE_CACHE_SHIFT) + 1; if ((end << PAGE_CACHE_SHIFT) < extent->end) end = ~0; + if (lsm->lsm_stripe_count > 1) { + count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT; + skip = (lsm->lsm_stripe_count - 1) * count; + start += (start/count * skip) + (stripe * count); + if (end != ~0) + end += (end/count * skip) + (stripe * count); + } i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT; - if (end >= i) - clear_bit(LLI_F_HAVE_OST_SIZE_LOCK, - &(ll_i2info(inode)->lli_flags)); if (i < end) end = i; - CDEBUG(D_INODE, "walking page indices start: %lu end: %lu\n", start, - end); + CDEBUG(D_INODE, "walking page indices start: %lu j: %lu count: %lu " + "skip: %lu end: %lu%s\n", start, start % count, count, skip, end, + discard ? " (DISCARDING)" : ""); + + /* this is the simplistic implementation of page eviction at + * cancelation. It is careful to get races with other page + * lockers handled correctly. fixes from bug 20 will make it + * more efficient by associating locks with pages and with + * batching writeback under the lock explicitly. */ + for (i = start, j = start % count ; ; j++, i++) { + if (j == count) { + i += skip; + j = 0; + } + if (i >= end) + break; - for (i = start; i < end; i++) { ll_pgcache_lock(inode->i_mapping); if (list_empty(&inode->i_mapping->dirty_pages) && - list_empty(&inode->i_mapping->clean_pages) && - list_empty(&inode->i_mapping->locked_pages)) { + list_empty(&inode->i_mapping->clean_pages) && + list_empty(&inode->i_mapping->locked_pages)) { CDEBUG(D_INODE, "nothing left\n"); ll_pgcache_unlock(inode->i_mapping); break; @@ -337,8 +386,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, page = find_get_page(inode->i_mapping, i); if (page == NULL) continue; - - LL_CDEBUG_PAGE(page, "locking\n"); + LL_CDEBUG_PAGE(page, "locking page\n"); lock_page(page); /* page->mapping to check with racing against teardown */ @@ -350,11 +398,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, list_add(&page->list, &inode->i_mapping->locked_pages); ll_pgcache_unlock(inode->i_mapping); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - rc = inode->i_mapping->a_ops->writepage(page); -#else - rc = inode->i_mapping->a_ops->writepage(page, NULL); -#endif + rc = ll_call_writepage(inode, page); if (rc != 0) CERROR("writepage of page %p failed: %d\n", page, rc); @@ -371,16 +415,6 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, unlock_page(page); page_cache_release(page); } - - if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) { - rc = obd_lock_contains(exp, lsm, lock, inode->i_size - 1); - if (rc != 0) { - if (rc < 0) - CERROR("obd_lock_contains: rc = %d\n", rc); - clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags); - } - } - EXIT; } @@ -392,7 +426,6 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, int rc; ENTRY; - if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { LDLM_ERROR(lock, "cancelling lock with bad data %p", data); LBUG(); @@ -408,17 +441,32 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, case LDLM_CB_CANCELING: { struct inode *inode = ll_inode_from_lock(lock); struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + __u32 stripe; + __u64 kms; - if (!inode) - RETURN(0); - lli= ll_i2info(inode); - if (!lli) + if (inode == NULL) RETURN(0); - if (!lli->lli_smd) - RETURN(0); - - ll_pgcache_remove_extent(inode, lli->lli_smd, lock); + lli = ll_i2info(inode); + if (lli == NULL) + goto iput; + if (lli->lli_smd == NULL) + goto iput; + lsm = lli->lli_smd; + + stripe = ll_lock_to_stripe_offset(inode, lock); + ll_pgcache_remove_extent(inode, lsm, lock, stripe); + + down(&inode->i_sem); + kms = ldlm_extent_shift_kms(lock, + lsm->lsm_oinfo[stripe].loi_kms); + if (lsm->lsm_oinfo[stripe].loi_kms != kms) + LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, + lsm->lsm_oinfo[stripe].loi_kms, kms); + lsm->lsm_oinfo[stripe].loi_kms = kms; + up(&inode->i_sem); //ll_try_done_writing(inode); + iput: iput(inode); break; } @@ -429,112 +477,181 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, RETURN(0); } -/* - * some callers, notably truncate, really don't want i_size set based - * on the the size returned by the getattr, or lock acquisition in - * the future. - */ -int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode, - struct lov_stripe_md *lsm, - int mode, struct ldlm_extent *extent, - struct lustre_handle *lockh, int ast_flags) +#if 0 +int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - int rc; + /* XXX ALLOCATE - 160 bytes */ + struct inode *inode = ll_inode_from_lock(lock); + struct ll_inode_info *lli = ll_i2info(inode); + struct lustre_handle lockh = { 0 }; + struct ost_lvb *lvb; + __u32 stripe; ENTRY; - LASSERT(lockh->cookie == 0); + if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV)) { + LBUG(); /* not expecting any blocked async locks yet */ + LDLM_DEBUG(lock, "client-side async enqueue returned a blocked " + "lock, returning"); + ldlm_lock_dump(D_OTHER, lock, 0); + ldlm_reprocess_all(lock->l_resource); + RETURN(0); + } - /* XXX phil: can we do this? won't it screw the file size up? */ - if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || - (sbi->ll_flags & LL_SBI_NOLCK)) + LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed"); + + stripe = ll_lock_to_stripe_offset(inode, lock); + + if (lock->l_lvb_len) { + struct lov_stripe_md *lsm = lli->lli_smd; + __u64 kms; + lvb = lock->l_lvb_data; + lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size; + + down(&inode->i_sem); + kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size); + kms = ldlm_extent_shift_kms(NULL, kms); + if (lsm->lsm_oinfo[stripe].loi_kms != kms) + LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, + lsm->lsm_oinfo[stripe].loi_kms, kms); + lsm->lsm_oinfo[stripe].loi_kms = kms; + up(&inode->i_sem); + } + + iput(inode); + wake_up(&lock->l_waitq); + + ldlm_lock2handle(lock, &lockh); + ldlm_lock_decref(&lockh, LCK_PR); + RETURN(0); +} +#endif + +/* This function is a disaster. I hate the LOV. */ +static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp) +{ + struct ptlrpc_request *req = reqp; + struct inode *inode = ll_inode_from_lock(lock); + struct obd_export *exp; + struct ll_inode_info *lli; + struct ost_lvb *lvb; + struct { + int stripe_number; + __u64 size; + struct lov_stripe_md *lsm; + } data; + __u32 vallen = sizeof(data); + int rc, size = sizeof(*lvb); + ENTRY; + + if (inode == NULL) RETURN(0); + lli = ll_i2info(inode); + if (lli == NULL) + goto iput; + if (lli->lli_smd == NULL) + goto iput; + exp = ll_i2obdexp(inode); + + /* First, find out which stripe index this lock corresponds to. */ + if (lli->lli_smd->lsm_stripe_count > 1) + data.stripe_number = ll_lock_to_stripe_offset(inode, lock); + else + data.stripe_number = 0; - CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n", - inode->i_ino, extent->start, extent->end); + data.size = inode->i_size; + data.lsm = lli->lli_smd; - rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent, - sizeof(extent), mode, &ast_flags, - ll_extent_lock_callback, inode, lockh); + rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe", + &vallen, &data); + if (rc != 0) { + CERROR("obd_get_info: rc = %d\n", rc); + LBUG(); + } + + LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu", + inode->i_size, data.stripe_number, data.size); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("lustre_pack_reply: %d\n", rc); + goto iput; + } + + lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb)); + lvb->lvb_size = data.size; + ptlrpc_reply(req); + + iput: + iput(inode); + RETURN(0); +} + +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); +__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time); + +/* NB: lov_merge_size will prefer locally cached writes if they extend the + * file (because it prefers KMS over RSS when larger) */ +int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct lustre_handle lockh; + int rc, flags = LDLM_FL_HAS_INTENT; + ENTRY; + + CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino); + + rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy, + LCK_PR, &flags, ll_extent_lock_callback, + ldlm_completion_ast, ll_glimpse_callback, inode, + sizeof(*lvb), lustre_swab_ost_lvb, &lockh); if (rc > 0) - rc = -EIO; + RETURN(-EIO); + + lvb->lvb_size = lov_merge_size(lli->lli_smd, 0); + //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime); + + CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size); + + obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh); + RETURN(rc); } -/* - * this grabs a lock and manually implements behaviour that makes it look like - * the OST is returning the file size with each lock acquisition. - */ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, struct lov_stripe_md *lsm, int mode, - struct ldlm_extent *extent, struct lustre_handle *lockh) + ldlm_policy_data_t *policy, struct lustre_handle *lockh, + int ast_flags) { - struct ll_inode_info *lli = ll_i2info(inode); - struct obd_export *exp = ll_i2obdexp(inode); - struct ldlm_extent size_lock; - struct lustre_handle match_lockh = {0}; - struct obdo oa; - obd_flag refresh_valid; - int flags, rc, matched; + struct ll_sb_info *sbi = ll_i2sbi(inode); + int rc; ENTRY; - rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0); - if (rc != ELDLM_OK) - RETURN(rc); + LASSERT(lockh->cookie == 0); - if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) + /* XXX phil: can we do this? won't it screw the file size up? */ + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) RETURN(0); - rc = ll_lsm_getattr(exp, lsm, &oa); - if (rc) - GOTO(out, rc); + CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n", + inode->i_ino, policy->l_extent.start, policy->l_extent.end); - /* We set this flag in commit write as we extend the file size. When - * the bit is set and the lock is canceled that covers the file size, - * we clear the bit. This is enough to protect the window where our - * local size extension is needed for writeback. However, it relies on - * behaviour that won't be true in the near future. This assumes that - * all getattr callers get extent locks, which they currnetly do. It - * also assumes that we only send discarding asts for {0,eof} truncates - * as is currently the case. This will have to be replaced by the - * proper eoc communication between clients and the ost, which is on - * its way. */ - refresh_valid = (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLSIZE); - if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) { - if (oa.o_size < inode->i_size) - refresh_valid &= ~OBD_MD_FLSIZE; - else - clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags); - } - obdo_refresh_inode(inode, &oa, refresh_valid); - - CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n", - lsm->lsm_object_id, inode->i_size, inode->i_blocks, - inode->i_blksize); - - size_lock.start = inode->i_size; - size_lock.end = OBD_OBJECT_EOF; - - /* XXX I bet we should be checking the lock ignore flags.. */ - flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; - matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock, - sizeof(size_lock), LCK_PR, &flags, inode, - &match_lockh); - if (matched < 0) - GOTO(out, rc = matched); - - /* hey, alright, we hold a size lock that covers the size we - * just found, its not going to change for a while.. */ - if (matched == 1) { - set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags); - obd_cancel(exp, lsm, LCK_PR, &match_lockh); - } + rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode, + &ast_flags, ll_extent_lock_callback, + ldlm_completion_ast, ll_glimpse_callback, inode, + sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh); + if (rc > 0) + rc = -EIO; + + if (policy->l_extent.start == 0 && + policy->l_extent.end == OBD_OBJECT_EOF) + inode->i_size = lov_merge_size(lsm, 1); + + //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime); - rc = 0; -out: - if (rc) - ll_extent_unlock(fd, inode, lsm, mode, lockh); RETURN(rc); } @@ -564,9 +681,10 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *lsm = lli->lli_smd; struct lustre_handle lockh = { 0 }; - struct ldlm_extent extent; + ldlm_policy_data_t policy; ldlm_error_t err; ssize_t retval; + __u64 kms; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n", inode->i_ino, inode->i_generation, inode, count, *ppos); @@ -582,20 +700,28 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (!lsm) RETURN(0); - /* grab a -> eof extent to push extending writes out of node's caches - * so we can see them at the getattr after lock acquisition. this will - * turn into a seperate [*ppos + count, EOF] 'size intent' lock attempt - * in the future. */ - extent.start = *ppos; - extent.end = OBD_OBJECT_EOF; + policy.l_extent.start = *ppos; + policy.l_extent.end = *ppos + count - 1; - err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); + err = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0); if (err != ELDLM_OK) RETURN(err); + kms = lov_merge_size(lsm, 1); + if (policy.l_extent.end > kms) { + /* A glimpse is necessary to determine whether we return a short + * read or some zeroes at the end of the buffer */ + struct ost_lvb lvb; + retval = ll_glimpse_size(inode, &lvb); + if (retval) + goto out; + inode->i_size = lvb.lvb_size; + } else { + inode->i_size = kms; + } - CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n", - inode->i_ino, count, *ppos); + CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, i_size " + LPU64"\n", inode->i_ino, count, *ppos, inode->i_size); /* turn off the kernel's read-ahead */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) @@ -605,7 +731,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, #endif retval = generic_file_read(filp, buf, count, ppos); - /* XXX errors? */ + out: ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); RETURN(retval); } @@ -620,11 +746,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, struct inode *inode = file->f_dentry->d_inode; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct lustre_handle lockh = { 0 }; - struct ldlm_extent extent; + ldlm_policy_data_t policy; loff_t maxbytes = ll_file_maxbytes(inode); ldlm_error_t err; ssize_t retval; - char should_validate = 1; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n", inode->i_ino, inode->i_generation, inode, count, *ppos); @@ -638,23 +763,14 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, LASSERT(lsm); if (file->f_flags & O_APPEND) { - extent.start = 0; - extent.end = OBD_OBJECT_EOF; + policy.l_extent.start = 0; + policy.l_extent.end = OBD_OBJECT_EOF; } else { - extent.start = *ppos; - extent.end = *ppos + count - 1; - /* we really don't care what i_size is if we're doing - * fully page aligned writes */ - if ((*ppos & ~PAGE_CACHE_MASK) == 0 && - (count & ~PAGE_CACHE_MASK) == 0) - should_validate = 0; + policy.l_extent.start = *ppos; + policy.l_extent.end = *ppos + count - 1; } - if (should_validate) - err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh); - else - err = ll_extent_lock_no_validate(fd, inode, lsm, LCK_PW, - &extent, &lockh, 0); + err = ll_extent_lock(fd, inode, lsm, LCK_PW, &policy, &lockh, 0); if (err != ELDLM_OK) RETURN(err); @@ -939,8 +1055,8 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK); if (origin == 2) { /* SEEK_END */ ldlm_error_t err; - struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; - err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); + ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }}; + err = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0); if (err != ELDLM_OK) RETURN(err); @@ -1019,7 +1135,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) struct ldlm_res_id res_id = { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} }; struct lustre_handle lockh = {0}; - struct ldlm_flock flock; + ldlm_policy_data_t flock; ldlm_mode_t mode = 0; int flags = 0; int rc; @@ -1028,9 +1144,9 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n", inode->i_ino, file_lock); - flock.pid = file_lock->fl_pid; - flock.start = file_lock->fl_start; - flock.end = file_lock->fl_end; + flock.l_flock.pid = file_lock->fl_pid; + flock.l_flock.start = file_lock->fl_start; + flock.l_flock.end = file_lock->fl_end; switch (file_lock->fl_type) { case F_RDLCK: @@ -1074,14 +1190,14 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) } CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, " - "start="LPU64", end="LPU64"\n", inode->i_ino, flock.pid, - flags, mode, flock.start, flock.end); + "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid, + flags, mode, flock.l_flock.start, flock.l_flock.end); obddev = sbi->ll_mdc_exp->exp_obd; rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, obddev->obd_namespace, - NULL, res_id, LDLM_FLOCK, &flock, sizeof(flock), - mode, &flags, ldlm_flock_completion_ast, NULL, - file_lock, &lockh); + res_id, LDLM_FLOCK, &flock, mode, &flags, + NULL, ldlm_flock_completion_ast, NULL, file_lock, + NULL, 0, NULL, &lockh); RETURN(rc); } @@ -1105,13 +1221,13 @@ static int ll_have_md_lock(struct dentry *de) flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, 0, LCK_PR, &lockh)) { + NULL, LCK_PR, &lockh)) { ldlm_lock_decref(&lockh, LCK_PR); RETURN(1); } if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, 0, LCK_PW, &lockh)) { + NULL, LCK_PW, &lockh)) { ldlm_lock_decref(&lockh, LCK_PW); RETURN(1); } @@ -1121,6 +1237,7 @@ static int ll_have_md_lock(struct dentry *de) int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) { struct inode *inode = dentry->d_inode; + struct ll_inode_info *lli; struct lov_stripe_md *lsm; ENTRY; @@ -1128,6 +1245,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) CERROR("REPORT THIS LINE TO PETER\n"); RETURN(0); } + lli = ll_i2info(inode); CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n", inode->i_ino, inode->i_generation, inode, dentry->d_name.name); #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0)) @@ -1159,30 +1277,18 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) ptlrpc_req_finished(req); } -#if 0 - if (ll_have_md_lock(dentry) && - test_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &ll_i2info(inode)->lli_flags)) - RETURN(0); -#endif - - lsm = ll_i2info(inode)->lli_smd; - if (!lsm) /* object not yet allocated, don't validate size */ + lsm = lli->lli_smd; + if (lsm == NULL) /* object not yet allocated, don't validate size */ RETURN(0); - /* unfortunately stat comes in through revalidate and we don't - * differentiate this use from initial instantiation. we're - * also being wildly conservative and flushing write caches - * so that stat really returns the proper size. */ + /* ll_glimpse_size will prefer locally cached writes if they extend + * the file */ { - struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; - struct lustre_handle lockh = {0}; + struct ost_lvb lvb; ldlm_error_t err; - err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh); - if (err != ELDLM_OK) - RETURN(err); - - ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); + err = ll_glimpse_size(inode, &lvb); + inode->i_size = lvb.lvb_size; } RETURN(0); } diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c index bf064c0..3e1c195 100644 --- a/lustre/llite/llite_close.c +++ b/lustre/llite/llite_close.c @@ -109,6 +109,7 @@ void ll_queue_done_writing(struct inode *inode) EXIT; } +#if 0 /* If we know the file size and have the cookies: * - send a DONE_WRITING rpc * @@ -120,7 +121,7 @@ void ll_queue_done_writing(struct inode *inode) static void ll_close_done_writing(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); - struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF }; + ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } }; struct lustre_handle lockh = { 0 }; struct obdo obdo; obd_flag valid; @@ -131,8 +132,8 @@ static void ll_close_done_writing(struct inode *inode) if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) goto rpc; - rc = ll_extent_lock_no_validate(NULL, inode, lli->lli_smd, LCK_PW, - &extent, &lockh, ast_flags); + rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh, + ast_flags); if (rc != ELDLM_OK) { CERROR("lock acquisition failed (%d): unable to send " "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino, @@ -169,8 +170,8 @@ static void ll_close_done_writing(struct inode *inode) rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo); out: - iput(inode); } +#endif static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq) { @@ -212,7 +213,7 @@ static int ll_close_thread(void *arg) while (1) { struct l_wait_info lwi = { 0 }; struct ll_inode_info *lli; - struct inode *inode; + //struct inode *inode; l_wait_event_exclusive(lcq->lcq_waitq, (lli = ll_close_next_lli(lcq)) != NULL, @@ -220,8 +221,9 @@ static int ll_close_thread(void *arg) if (IS_ERR(lli)) break; - inode = ll_info2i(lli); - ll_close_done_writing(inode); + //inode = ll_info2i(lli); + //ll_close_done_writing(inode); + //iput(inode); } complete(&lcq->lcq_comp); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index d51ff14..799dabe 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -171,17 +171,14 @@ extern struct inode_operations ll_file_inode_operations; extern struct inode_operations ll_special_inode_operations; extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *); int ll_extent_lock(struct ll_file_data *, struct inode *, - struct lov_stripe_md *, int mode, struct ldlm_extent *, - struct lustre_handle *); + struct lov_stripe_md *, int mode, ldlm_policy_data_t *, + struct lustre_handle *, int ast_flags); int ll_extent_unlock(struct ll_file_data *, struct inode *, struct lov_stripe_md *, int mode, struct lustre_handle *); int ll_file_open(struct inode *inode, struct file *file); int ll_file_release(struct inode *inode, struct file *file); int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *); -int ll_extent_lock_no_validate(struct ll_file_data *, struct inode *, - struct lov_stripe_md *, int mode, - struct ldlm_extent *, struct lustre_handle *, - int ast_flags); +int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb); #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct lookup_intent *it, struct kstat *stat); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index e5801a0..bc417230 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -881,8 +881,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * last one is especially bad for racing o_append users on other * nodes. */ if (ia_valid & ATTR_SIZE) { - struct ldlm_extent extent = { .start = attr->ia_size, - .end = OBD_OBJECT_EOF }; + ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, + OBD_OBJECT_EOF } }; struct lustre_handle lockh = { 0 }; int err, ast_flags = 0; /* XXX when we fix the AST intents to pass the discard-range @@ -894,22 +894,21 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* bug 1639: avoid write/truncate i_sem/DLM deadlock */ LASSERT(atomic_read(&inode->i_sem.count) <= 0); up(&inode->i_sem); - rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, - &extent, &lockh, ast_flags); + rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, + ast_flags); down(&inode->i_sem); if (rc != ELDLM_OK) RETURN(rc); rc = vmtruncate(inode, attr->ia_size); - if (rc == 0) - set_bit(LLI_F_HAVE_OST_SIZE_LOCK, - &ll_i2info(inode)->lli_flags); - - //ll_try_done_writing(inode); + /* We need to drop the semaphore here, because this unlock may + * result in a cancellation, which will need the i_sem */ + up(&inode->i_sem); /* unlock now as we don't mind others file lockers racing with * the mds updates below? */ err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); + down(&inode->i_sem); if (err) { CERROR("ll_extent_unlock failed: %d\n", err); if (!rc) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 2630b5f..4c59d71 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -41,6 +41,7 @@ #include #include #include +#include #include "llite_internal.h" /* methods */ @@ -286,9 +287,8 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, /* If this is a stat, get the authoritative file size */ if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) && ll_i2info(inode)->lli_smd != NULL) { - struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; - struct lustre_handle lockh = {0}; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct ost_lvb lvb; ldlm_error_t rc; LASSERT(lsm->lsm_object_id != 0); @@ -296,13 +296,12 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, /* bug 2334: drop MDS lock before acquiring OST lock */ ll_intent_drop_lock(it); - rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, - &lockh); - if (rc != ELDLM_OK) { + rc = ll_glimpse_size(inode, &lvb); + if (rc) { iput(inode); RETURN(-EIO); } - ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); + inode->i_size = lvb.lvb_size; } dentry = *de = ll_find_alias(inode, dentry); @@ -320,8 +319,14 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, } +#if (LUSTRE_KERNEL_VERSION < 33) static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, struct lookup_intent *it, int flags) +#else +static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, + struct nameidata *nd, + struct lookup_intent *it, int flags) +#endif { struct dentry *save = dentry, *retval; struct ll_fid pfid; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 113c5ac..bf0594c 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -126,6 +126,7 @@ void ll_truncate(struct inode *inode) oa.o_id, inode->i_size); /* truncate == punch from new size to absolute end of file */ + /* NB: obd_punch must be called with i_sem held! It updates the kms! */ rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size, OBD_OBJECT_EOF, NULL); if (rc) @@ -139,6 +140,7 @@ void ll_truncate(struct inode *inode) return; } /* ll_truncate */ +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); int ll_prepare_write(struct file *file, struct page *page, unsigned from, unsigned to) { @@ -148,6 +150,7 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; struct brw_page pga; struct obdo oa; + __u64 kms; int rc = 0; ENTRY; @@ -179,10 +182,11 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, RETURN(0); } - /* If are writing to a new page, no need to read old data. - * the extent locking and getattr procedures in ll_file_write have - * guaranteed that i_size is stable enough for our zeroing needs */ - if (inode->i_size <= offset) { + /* If are writing to a new page, no need to read old data. The extent + * locking will have updated the KMS, and for our purposes here we can + * treat it like i_size. */ + kms = lov_merge_size(lsm, 1); + if (kms <= offset) { memset(kmap(page), 0, PAGE_SIZE); kunmap(page); GOTO(prepare_done, rc = 0); @@ -374,6 +378,8 @@ struct ll_async_page *llap_from_page(struct page *page) RETURN(llap); } +void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm, + obd_off size); /* update our write count to account for i_size increases that may have * happened since we've queued the page for io. */ @@ -449,13 +455,10 @@ free_osic: out: if (rc == 0) { - /* XXX needs to be pushed down to the OSC as EOC */ size = (((obd_off)page->index) << PAGE_SHIFT) + to; - if (size > inode->i_size) { + lov_increase_kms(exp, lsm, size); + if (size > inode->i_size) inode->i_size = size; - /* see commentary in file.c:ll_inode_getattr() */ - set_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags); - } SetPageUptodate(page); } RETURN(rc); @@ -524,17 +527,17 @@ static int ll_page_matches(struct page *page) { struct lustre_handle match_lockh = {0}; struct inode *inode = page->mapping->host; - struct ldlm_extent page_extent; + ldlm_policy_data_t page_extent; int flags, matches; ENTRY; - page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; - page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; + page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; + page_extent.l_extent.end = + page_extent.l_extent.start + PAGE_CACHE_SIZE - 1; flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, ll_i2info(inode)->lli_smd, LDLM_EXTENT, - &page_extent, sizeof(page_extent), - LCK_PR, &flags, inode, &match_lockh); + &page_extent, LCK_PR, &flags, inode, &match_lockh); if (matches < 0) { LL_CDEBUG_PAGE(page, "lock match failed\n"); RETURN(matches); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 03506e63..47a447e 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -54,6 +54,65 @@ static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, int stripeno, obd_off *obd_off); +struct lov_lock_handles { + struct portals_handle llh_handle; + atomic_t llh_refcount; + int llh_stripe_count; + struct lustre_handle llh_handles[0]; +}; + +static void lov_llh_addref(void *llhp) +{ + struct lov_lock_handles *llh = llhp; + + atomic_inc(&llh->llh_refcount); + CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh, + atomic_read(&llh->llh_refcount)); +} + +static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm) +{ + struct lov_lock_handles *llh; + + OBD_ALLOC(llh, sizeof *llh + + sizeof(*llh->llh_handles) * lsm->lsm_stripe_count); + if (llh == NULL) { + CERROR("out of memory\n"); + return NULL; + } + atomic_set(&llh->llh_refcount, 2); + llh->llh_stripe_count = lsm->lsm_stripe_count; + INIT_LIST_HEAD(&llh->llh_handle.h_link); + class_handle_hash(&llh->llh_handle, lov_llh_addref); + return llh; +} + +static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle) +{ + ENTRY; + LASSERT(handle != NULL); + RETURN(class_handle2object(handle->cookie)); +} + +static void lov_llh_put(struct lov_lock_handles *llh) +{ + CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh, + atomic_read(&llh->llh_refcount) - 1); + LASSERT(atomic_read(&llh->llh_refcount) > 0 && + atomic_read(&llh->llh_refcount) < 0x5a5a); + if (atomic_dec_and_test(&llh->llh_refcount)) { + LASSERT(list_empty(&llh->llh_handle.h_link)); + OBD_FREE(llh, sizeof *llh + + sizeof(*llh->llh_handles) * llh->llh_stripe_count); + } +} + +static void lov_llh_destroy(struct lov_lock_handles *llh) +{ + class_handle_unhash(&llh->llh_handle); + lov_llh_put(llh); +} + /* obd methods */ int lov_attach(struct obd_device *dev, obd_count len, void *data) { @@ -637,6 +696,8 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, } else { --ost_start_count; ost_start_idx += lsm->lsm_stripe_count; + if (lsm->lsm_stripe_count == ost_count) + ++ost_start_idx; } ost_idx = ost_start_idx % ost_count; } else { @@ -1196,6 +1257,59 @@ static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, return ret; } +/* Given a whole-file size and a stripe number, give the file size which + * corresponds to the individual object of that stripe. + * + * This behaves basically in the same was as lov_stripe_offset, except that + * file sizes falling before the beginning of a stripe are clamped to the end + * of the previous stripe, not the beginning of the next: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + * + * if clamped to stripe 2 becomes: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + */ +static obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off, this_stripe; + + if (file_size == OBD_OBJECT_EOF) + return OBD_OBJECT_EOF; + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(file_size, swidth); + + this_stripe = stripeno * ssize; + if (stripe_off < this_stripe) { + /* Move to end of previous stripe, or zero */ + if (file_size > 0) { + file_size--; + stripe_off = ssize; + } else { + stripe_off = 0; + } + } else { + stripe_off -= this_stripe; + + if (stripe_off >= ssize) { + /* Clamp to end of this stripe */ + stripe_off = ssize; + } + } + + return (file_size * ssize + stripe_off); +} + /* given an extent in an lov and a stripe, calculate the extent of the stripe * that is contained within the lov extent. this returns true if the given * stripe does intersect with the lov extent. */ @@ -1289,6 +1403,8 @@ static int lov_punch(struct obd_export *exp, struct obdo *oa, } if (!rc) rc = err; + } else { + loi->loi_kms = loi->loi_rss = starti; } } RETURN(rc); @@ -1885,16 +2001,19 @@ static int lov_teardown_async_page(struct obd_export *exp, } static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lustre_handle *parent_lock, - __u32 type, void *cookie, int cookielen, __u32 mode, - int *flags, void *cb, void *data, + __u32 type, ldlm_policy_data_t *policy, __u32 mode, + int *flags, void *bl_cb, void *cp_cb, void *gl_cb, + void *data,__u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh) { + struct lov_lock_handles *lov_lockh = NULL; + struct lustre_handle *lov_lockhp; struct lov_obd *lov; struct lov_oinfo *loi; - struct lov_stripe_md submd; - struct ldlm_extent *extent = cookie; - int rc; + char submd_buf[sizeof(struct lov_stripe_md) + sizeof(struct lov_oinfo)]; + struct lov_stripe_md *submd = (void *)submd_buf; + ldlm_error_t rc; + int i, save_flags = *flags; ENTRY; if (lsm_bad_magic(lsm)) @@ -1906,42 +2025,133 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); + if (lsm->lsm_stripe_count > 1) { + lov_lockh = lov_llh_new(lsm); + if (lov_lockh == NULL) + RETURN(-ENOMEM); + + lockh->cookie = lov_lockh->llh_handle.h_cookie; + lov_lockhp = lov_lockh->llh_handles; + } else { + lov_lockhp = lockh; + } + lov = &exp->exp_obd->u.lov; - loi = lsm->lsm_oinfo; - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - RETURN(-EIO); - } - - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - /* XXX submd is not fully initialized here */ - *flags = 0; - rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - parent_lock, type, extent, sizeof(*extent), - mode, flags, cb, data, lockh); - - if (rc != ELDLM_OK) { - memset(lockh, 0, sizeof(*lockh)); - if (lov->tgts[loi->loi_ost_idx].active) - CERROR("error: enqueue objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - lsm->lsm_object_id, loi->loi_id, - loi->loi_ost_idx, rc); + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++, lov_lockhp++) { + ldlm_policy_data_t sub_ext; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, + &sub_ext.l_extent.start, + &sub_ext.l_extent.end)) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + /* XXX LOV STACKING: submd should be from the subobj */ + submd->lsm_object_id = loi->loi_id; + submd->lsm_stripe_count = 0; + submd->lsm_oinfo->loi_rss = loi->loi_rss; + submd->lsm_oinfo->loi_kms = loi->loi_kms; + loi->loi_mtime = submd->lsm_oinfo->loi_mtime; + /* XXX submd is not fully initialized here */ + *flags = save_flags; + rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, submd, + type, &sub_ext, mode, flags, bl_cb, cp_cb, + gl_cb, data, lvb_len, lvb_swabber, lov_lockhp); + + /* XXX FIXME: This unpleasantness doesn't belong here at *all*. + * It belongs in the OSC, except that the OSC doesn't have + * access to the real LOI -- it gets a copy, that we created + * above, and that copy can be arbitrarily out of date. + * + * The LOV API is due for a serious rewriting anyways, and this + * can be addressed then. */ + if (rc == ELDLM_OK) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + __u64 tmp = submd->lsm_oinfo->loi_rss; + + LASSERT(lock != NULL); + loi->loi_rss = tmp; + // Extend KMS up to the end of this lock, and no further + if (tmp > lock->l_policy_data.l_extent.end) + tmp = lock->l_policy_data.l_extent.end; + if (tmp > loi->loi_kms) { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64", kms="LPU64"\n", loi->loi_rss, + tmp); + loi->loi_kms = tmp; + } else { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64"; leaving kms="LPU64", end="LPU64 + "\n", loi->loi_rss, loi->loi_kms, + lock->l_policy_data.l_extent.end); + } + ldlm_lock_allow_match(lock); + LDLM_LOCK_PUT(lock); + } else if (rc == ELDLM_LOCK_ABORTED && + save_flags & LDLM_FL_HAS_INTENT) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + loi->loi_rss = submd->lsm_oinfo->loi_rss; + CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" + " kms="LPU64"\n", loi->loi_rss, loi->loi_kms); + } else { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->tgts[loi->loi_ost_idx].active) { + CERROR("error: enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + lsm->lsm_object_id, loi->loi_id, + loi->loi_ost_idx, rc); + GOTO(out_locks, rc); + } + } } - RETURN(rc); + if (lsm->lsm_stripe_count > 1) + lov_llh_put(lov_lockh); + RETURN(ELDLM_OK); + + out_locks: + while (loi--, lov_lockhp--, i-- > 0) { + struct lov_stripe_md submd; + int err; + + if (lov_lockhp->cookie == 0) + continue; + + /* XXX LOV STACKING: submd should be from the subobj */ + submd.lsm_object_id = loi->loi_id; + submd.lsm_stripe_count = 0; + err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, + mode, lov_lockhp); + if (err && lov->tgts[loi->loi_ost_idx].active) { + CERROR("error: cancelling objid "LPX64" on OST " + "idx %d after enqueue error: rc = %d\n", + loi->loi_id, loi->loi_ost_idx, err); + } + } + + if (lsm->lsm_stripe_count > 1) { + lov_llh_destroy(lov_lockh); + lov_llh_put(lov_lockh); + } + return rc; } static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, - __u32 type, void *cookie, int cookielen, __u32 mode, + __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { + struct lov_lock_handles *lov_lockh = NULL; + struct lustre_handle *lov_lockhp; struct lov_obd *lov; struct lov_oinfo *loi; struct lov_stripe_md submd; - struct ldlm_extent *extent = cookie; - int rc = 0; + ldlm_error_t rc = 0; + int i; ENTRY; if (lsm_bad_magic(lsm)) @@ -1950,19 +2160,75 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); + if (lsm->lsm_stripe_count > 1) { + lov_lockh = lov_llh_new(lsm); + if (lov_lockh == NULL) + RETURN(-ENOMEM); + + lockh->cookie = lov_lockh->llh_handle.h_cookie; + lov_lockhp = lov_lockh->llh_handles; + } else { + lov_lockhp = lockh; + } + lov = &exp->exp_obd->u.lov; - loi = lsm->lsm_oinfo; - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - RETURN(-EIO); + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++, lov_lockhp++) { + ldlm_policy_data_t sub_ext; + int lov_flags; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, + &sub_ext.l_extent.start, + &sub_ext.l_extent.end)) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + rc = -EIO; + break; + } + + /* XXX LOV STACKING: submd should be from the subobj */ + submd.lsm_object_id = loi->loi_id; + submd.lsm_stripe_count = 0; + lov_flags = *flags; + /* XXX submd is not fully initialized here */ + rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, + type, &sub_ext, mode, &lov_flags, data, + lov_lockhp); + if (rc != 1) + break; + } + if (rc == 1) { + if (lsm->lsm_stripe_count > 1) + lov_llh_put(lov_lockh); + RETURN(1); + } + + while (loi--, lov_lockhp--, i-- > 0) { + struct lov_stripe_md submd; + int err; + + if (lov_lockhp->cookie == 0) + continue; + + /* XXX LOV STACKING: submd should be from the subobj */ + submd.lsm_object_id = loi->loi_id; + submd.lsm_stripe_count = 0; + err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, + mode, lov_lockhp); + if (err && lov->tgts[loi->loi_ost_idx].active) { + CERROR("error: cancelling objid "LPX64" on OST " + "idx %d after match failure: rc = %d\n", + loi->loi_id, loi->loi_ost_idx, err); + } } - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - /* XXX submd is not fully initialized here */ - rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, type, - extent, sizeof(*extent), mode, flags, data, lockh); + if (lsm->lsm_stripe_count > 1) { + lov_llh_destroy(lov_lockh); + lov_llh_put(lov_lockh); + } RETURN(rc); } @@ -1998,10 +2264,11 @@ static int lov_change_cbdata(struct obd_export *exp, static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 mode, struct lustre_handle *lockh) { + struct lov_lock_handles *lov_lockh = NULL; + struct lustre_handle *lov_lockhp; struct lov_obd *lov; struct lov_oinfo *loi; - struct lov_stripe_md submd; - int rc = 0; + int rc = 0, i; ENTRY; if (lsm_bad_magic(lsm)) @@ -2010,20 +2277,52 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); + LASSERT(lockh); + if (lsm->lsm_stripe_count > 1) { + lov_lockh = lov_handle2llh(lockh); + if (!lov_lockh) { + CERROR("LOV: invalid lov lock handle %p\n", lockh); + RETURN(-EINVAL); + } + + lov_lockhp = lov_lockh->llh_handles; + } else { + lov_lockhp = lockh; + } + lov = &exp->exp_obd->u.lov; - loi = lsm->lsm_oinfo; + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++, lov_lockhp++) { + struct lov_stripe_md submd; + int err; - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - rc = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - mode, lockh); - if (rc && lov->tgts[loi->loi_ost_idx].active) - CERROR("error: cancel objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - lsm->lsm_object_id, loi->loi_id, loi->loi_ost_idx, rc); - GOTO(out, rc); -out: + if (lov_lockhp->cookie == 0) { + CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + loi->loi_ost_idx, loi->loi_id); + continue; + } + + /* XXX LOV STACKING: submd should be from the subobj */ + submd.lsm_object_id = loi->loi_id; + submd.lsm_stripe_count = 0; + err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, + mode, lov_lockhp); + if (err) { + if (lov->tgts[loi->loi_ost_idx].active) { + CERROR("error: cancel objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + lsm->lsm_object_id, + loi->loi_id, loi->loi_ost_idx, err); + if (!rc) + rc = err; + } + } + } + + if (lsm->lsm_stripe_count > 1) + lov_llh_destroy(lov_lockh); + if (lov_lockh != NULL) + lov_llh_put(lov_lockh); RETURN(rc); } @@ -2228,7 +2527,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len, static int lov_get_info(struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val) -{ +{ struct obd_device *obddev = class_exp2obd(exp); struct lov_obd *lov = &obddev->u.lov; int i; @@ -2257,13 +2556,27 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen, for (i = 0, loi = data->lsm->lsm_oinfo; i < data->lsm->lsm_stripe_count; i++, loi++) { - if (lov->tgts[loi->loi_ost_idx].ltd_exp == + if (lov->tgts[loi->loi_ost_idx].ltd_exp == data->lock->l_conn_export) { *stripe = i; RETURN(0); } } RETURN(-ENXIO); + } else if (keylen >= strlen("size_to_stripe") && + strcmp(key, "size_to_stripe") == 0) { + struct { + int stripe_number; + __u64 size; + struct lov_stripe_md *lsm; + } *data = val; + + if (*vallen < sizeof(*data)) + RETURN(-EFAULT); + + data->size = lov_size_to_stripe(data->lsm, data->size, + data->stripe_number); + RETURN(0); } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) { obd_id *ids = val; int rc, size = sizeof(obd_id); @@ -2340,37 +2653,157 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, } +/* Merge rss if kms == 0 + * + * Even when merging RSS, we will take the KMS value if it's larger. + * This prevents getattr from stomping on dirty cached pages which + * extend the file size. */ +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms) +{ + struct lov_oinfo *loi; + __u64 size = 0; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + obd_size lov_size, tmpsize; + + tmpsize = loi->loi_kms; + if (kms == 0 && loi->loi_rss > tmpsize) + tmpsize = loi->loi_rss; + + lov_size = lov_stripe_size(lsm, tmpsize, i); + if (lov_size > size) + size = lov_size; + } + return size; +} +EXPORT_SYMBOL(lov_merge_size); + +__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time) +{ + struct lov_oinfo *loi; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + if (loi->loi_mtime > current_time) + current_time = loi->loi_mtime; + } + return current_time; +} +EXPORT_SYMBOL(lov_merge_mtime); + +#if 0 +struct lov_multi_wait { + struct ldlm_lock *lock; + wait_queue_t wait; + int completed; + int generation; +}; -static int lov_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm, - struct ldlm_lock *lock, obd_off offset) +int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm, + struct lustre_handle *lockh) { + struct lov_lock_handles *lov_lockh = NULL; + struct lustre_handle *lov_lockhp; struct lov_obd *lov; struct lov_oinfo *loi; - struct lov_stripe_md submd; - int rc; + struct lov_multi_wait *queues; + int rc = 0, i; ENTRY; - LASSERT(lsm != NULL); - if (exp == NULL) + if (lsm_bad_magic(lsm)) + RETURN(-EINVAL); + + if (!exp || !exp->exp_obd) RETURN(-ENODEV); + LASSERT(lockh != NULL); + if (lsm->lsm_stripe_count > 1) { + lov_lockh = lov_handle2llh(lockh); + if (lov_lockh == NULL) { + CERROR("LOV: invalid lov lock handle %p\n", lockh); + RETURN(-EINVAL); + } + + lov_lockhp = lov_lockh->llh_handles; + } else { + lov_lockhp = lockh; + } + + OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues)); + if (queues == NULL) + GOTO(out, rc = -ENOMEM); + lov = &exp->exp_obd->u.lov; - loi = lsm->lsm_oinfo; + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++, lov_lockhp++) { + struct ldlm_lock *lock; + struct obd_device *obd; + unsigned long irqflags; + + lock = ldlm_handle2lock(lov_lockhp); + if (lock == NULL) { + CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + loi->loi_ost_idx, loi->loi_id); + queues[i].completed = 1; + continue; + } - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - RETURN(-EIO); + queues[i].lock = lock; + init_waitqueue_entry(&(queues[i].wait), current); + add_wait_queue(lock->l_waitq, &(queues[i].wait)); + + obd = class_exp2obd(lock->l_conn_export); + if (obd != NULL) + imp = obd->u.cli.cl_import; + if (imp != NULL) { + spin_lock_irqsave(&imp->imp_lock, irqflags); + queues[i].generation = imp->imp_generation; + spin_unlock_irqrestore(&imp->imp_lock, irqflags); + } } - - /* XXX submd is not fully initialized here */ - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - rc = obd_lock_contains(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - lock, offset); + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait, + interrupted_completion_wait, &lwd); + rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi); + + for (i = 0; i < lsm->lsm_stripe_count; i++) + remove_wait_queue(lock->l_waitq, &(queues[i].wait)); + + if (rc == -EINTR || rc == -ETIMEDOUT) { + + + } + + out: + if (lov_lockh != NULL) + lov_llh_put(lov_lockh); RETURN(rc); } +#endif + +void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm, + obd_off size) +{ + struct lov_oinfo *loi; + int stripe = 0; + __u64 kms; + ENTRY; + + if (size > 0) + stripe = lov_stripe_number(lsm, size - 1); + kms = lov_size_to_stripe(lsm, size, stripe); + loi = &(lsm->lsm_oinfo[stripe]); + + CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n", + stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms); + if (kms > loi->loi_kms) + loi->loi_kms = kms; + EXIT; +} +EXPORT_SYMBOL(lov_increase_kms); struct obd_ops lov_obd_ops = { o_owner: THIS_MODULE, @@ -2408,7 +2841,6 @@ struct obd_ops lov_obd_ops = { o_set_info: lov_set_info, o_llog_init: lov_llog_init, o_llog_finish: lov_llog_finish, - o_lock_contains:lov_lock_contains, o_notify: lov_notify, }; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 8fbe9b7..f102439 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -284,9 +284,10 @@ int mdc_enqueue(struct obd_export *exp, } mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, NULL, res_id, - lock_type, NULL, 0, lock_mode, &flags, - cb_completion, cb_blocking, cb_data, lockh); + rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id, + lock_type, NULL, lock_mode, &flags, cb_blocking, + cb_completion, NULL, cb_data, NULL, 0, NULL, + lockh); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); /* Similarly, if we're going to replay this request, we don't want to @@ -319,6 +320,7 @@ int mdc_enqueue(struct obd_export *exp, lock_mode = lock->l_req_mode; } + ldlm_lock_allow_match(lock); LDLM_LOCK_PUT(lock); } @@ -424,13 +426,12 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, mode = LCK_PR; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, flags, - &res_id, LDLM_PLAIN, NULL, 0, LCK_PR, - &lockh); + &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); if (!rc) { mode = LCK_PW; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, flags, - &res_id, LDLM_PLAIN, NULL, 0, - LCK_PW, &lockh); + &res_id, LDLM_PLAIN, NULL, LCK_PW, + &lockh); } if (rc) { memcpy(&it->d.lustre.it_lock_handle, &lockh, @@ -532,7 +533,7 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, LDLM_LOCK_PUT(lock); memcpy(&old_lock, &lockh, sizeof(lockh)); if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_PLAIN, NULL, 0, LCK_NL, &old_lock)) { + LDLM_PLAIN, NULL, LCK_NL, &old_lock)) { ldlm_lock_decref_and_cancel(&lockh, it->d.lustre.it_lock_mode); memcpy(&lockh, &old_lock, sizeof(old_lock)); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 8ceb655..51de280 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -486,16 +486,16 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, NULL, NULL); rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req), &lwi); - - if (rc == 0) { + if (rc == 0) { rc = req->rq_repmsg->status; if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err = %d", rc); + DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " + "= %d", rc); if (rc > 0) rc = -rc; } else if (mod == NULL) { - CERROR("Unexpected: can't find mdc_open_data, but the close " - "succeeded. Please tell CFS.\n"); + CERROR("Unexpected: can't find mdc_open_data, but the " + "close succeeded. Please tell CFS.\n"); } } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index fbb3a3c..5d8e2e2 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -56,6 +56,9 @@ #include "mds_internal.h" +static int mds_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data); static int mds_postsetup(struct obd_device *obd); static int mds_cleanup(struct obd_device *obd, int flags); @@ -167,10 +170,10 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, - mds_blocking_ast, NULL, lockh); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, + LDLM_PLAIN, NULL, lock_mode, &flags, + mds_blocking_ast, ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { l_dput(de); retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ @@ -418,7 +421,7 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, /* XXX layering violation! -phil */ l_lock(&lock->l_resource->lr_namespace->ns_lock); - /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy, + /* Get this: if mds_blocking_ast is racing with mds_intent_policy, * such that mds_blocking_ast is called just before l_i_p takes the * ns_lock, then by the time we get the lock, we might not be the * correct blocking function anymore. So check, and return early, if @@ -1205,7 +1208,7 @@ int mds_handle(struct ptlrpc_request *req) DEBUG_REQ(D_INODE, req, "enqueue"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, - ldlm_server_blocking_ast); + ldlm_server_blocking_ast, NULL); break; case LDLM_CONVERT: DEBUG_REQ(D_INODE, req, "convert"); @@ -1369,6 +1372,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) mds_cleanup(obd, 0); GOTO(err_put, rc = -ENOMEM); } + ldlm_register_intent(obd->obd_namespace, mds_intent_policy); rc = mds_fs_setup(obd, mnt); if (rc) { @@ -1624,159 +1628,146 @@ void intent_set_disposition(struct ldlm_reply *rep, int flag) rep->lock_policy_res1 |= flag; } -static int ldlm_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data) +static int mds_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) { struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp; - int rc; + struct ldlm_intent *it; + struct mds_obd *mds = &req->rq_export->exp_obd->u.mds; + struct ldlm_reply *rep; + struct lustre_handle lockh = { 0 }; + struct ldlm_lock *new_lock; + int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply), + sizeof(struct mds_body), + mds->mds_max_mdsize, + mds->mds_max_cookiesize}; ENTRY; - if (!req_cookie) + LASSERT(req != NULL); + + if (req->rq_reqmsg->bufcount <= 1) { + /* No intent was provided */ + int size = sizeof(struct ldlm_reply); + rc = lustre_pack_reply(req, 1, &size, NULL); + LASSERT(rc == 0); RETURN(0); + } - if (req->rq_reqmsg->bufcount > 1) { - /* an intent needs to be considered */ - struct ldlm_intent *it; - struct mds_obd *mds = &req->rq_export->exp_obd->u.mds; - struct ldlm_reply *rep; - struct lustre_handle lockh = { 0 }; - struct ldlm_lock *new_lock; - int offset = 2, repsize[4] = {sizeof(struct ldlm_reply), - sizeof(struct mds_body), - mds->mds_max_mdsize, - mds->mds_max_cookiesize}; - - it = lustre_swab_reqbuf(req, 1, sizeof (*it), - lustre_swab_ldlm_intent); - if (it == NULL) { - CERROR ("Intent missing\n"); - req->rq_status = -EFAULT; - RETURN(req->rq_status); - } + it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent); + if (it == NULL) { + CERROR("Intent missing\n"); + RETURN(req->rq_status = -EFAULT); + } - LDLM_DEBUG(lock, "intent policy, opc: %s", - ldlm_it2str(it->opc)); + LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, - repsize, NULL); - if (rc) - RETURN(req->rq_status = rc); + rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize, + NULL); + if (rc) + RETURN(req->rq_status = rc); - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); - intent_set_disposition(rep, DISP_IT_EXECD); + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); + intent_set_disposition(rep, DISP_IT_EXECD); - fixup_handle_for_resent_req(req, lock, &lockh); + fixup_handle_for_resent_req(req, lock, &lockh); - /* execute policy */ - switch ((long)it->opc) { - case IT_OPEN: - case IT_CREAT|IT_OPEN: - /* XXX swab here to assert that an mds_open reint - * packet is following */ - rep->lock_policy_res2 = mds_reint(req, offset, &lockh); + /* execute policy */ + switch ((long)it->opc) { + case IT_OPEN: + case IT_CREAT|IT_OPEN: + /* XXX swab here to assert that an mds_open reint + * packet is following */ + rep->lock_policy_res2 = mds_reint(req, offset, &lockh); #if 0 - /* We abort the lock if the lookup was negative and - * we did not make it to the OPEN portion */ - if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) - RETURN(ELDLM_LOCK_ABORTED); - if (intent_disposition(rep, DISP_LOOKUP_NEG) && - !intent_disposition(rep, DISP_OPEN_OPEN)) + /* We abort the lock if the lookup was negative and + * we did not make it to the OPEN portion */ + if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) + RETURN(ELDLM_LOCK_ABORTED); + if (intent_disposition(rep, DISP_LOOKUP_NEG) && + !intent_disposition(rep, DISP_OPEN_OPEN)) #endif - RETURN(ELDLM_LOCK_ABORTED); - break; - case IT_GETATTR: - case IT_LOOKUP: - case IT_READDIR: - rep->lock_policy_res2 = mds_getattr_name(offset, req, - &lockh); - /* FIXME: LDLM can set req->rq_status. MDS sets - policy_res{1,2} with disposition and status. - - replay: returns 0 & req->status is old status - - otherwise: returns req->status */ - if (intent_disposition(rep, DISP_LOOKUP_NEG)) - rep->lock_policy_res2 = 0; - if (!intent_disposition(rep, DISP_LOOKUP_POS) || - rep->lock_policy_res2) - RETURN(ELDLM_LOCK_ABORTED); - if (req->rq_status != 0) { - LBUG(); - rep->lock_policy_res2 = req->rq_status; - RETURN(ELDLM_LOCK_ABORTED); - } - break; - default: - CERROR("Unhandled intent "LPD64"\n", it->opc); + RETURN(ELDLM_LOCK_ABORTED); + break; + case IT_GETATTR: + case IT_LOOKUP: + case IT_READDIR: + rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh); + /* FIXME: LDLM can set req->rq_status. MDS sets + policy_res{1,2} with disposition and status. + - replay: returns 0 & req->status is old status + - otherwise: returns req->status */ + if (intent_disposition(rep, DISP_LOOKUP_NEG)) + rep->lock_policy_res2 = 0; + if (!intent_disposition(rep, DISP_LOOKUP_POS) || + rep->lock_policy_res2) + RETURN(ELDLM_LOCK_ABORTED); + if (req->rq_status != 0) { LBUG(); + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); } + break; + default: + CERROR("Unhandled intent "LPD64"\n", it->opc); + LBUG(); + } - /* By this point, whatever function we called above must have - * either filled in 'lockh', been an intent replay, or returned - * an error. We want to allow replayed RPCs to not get a lock, - * since we would just drop it below anyways because lock replay - * is done separately by the client afterwards. For regular - * RPCs we want to give the new lock to the client instead of - * whatever lock it was about to get. - */ - new_lock = ldlm_handle2lock(&lockh); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) - RETURN(0); - - LASSERT(new_lock != NULL); + /* By this point, whatever function we called above must have either + * filled in 'lockh', been an intent replay, or returned an error. We + * want to allow replayed RPCs to not get a lock, since we would just + * drop it below anyways because lock replay is done separately by the + * client afterwards. For regular RPCs we want to give the new lock to + * the client instead of whatever lock it was about to get. */ + new_lock = ldlm_handle2lock(&lockh); + if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) + RETURN(0); - /* If we've already given this lock to a client once, then we - * should have no readers or writers. Otherwise, we should - * have one reader _or_ writer ref (which will be zeroed below) - * before returning the lock to a client. - */ - if (new_lock->l_export == req->rq_export) { - LASSERT(new_lock->l_readers + new_lock->l_writers == 0); - } else { - LASSERT(new_lock->l_export == NULL); - LASSERT(new_lock->l_readers + new_lock->l_writers == 1); - } + LASSERT(new_lock != NULL); - *lockp = new_lock; + /* If we've already given this lock to a client once, then we should + * have no readers or writers. Otherwise, we should have one reader + * _or_ writer ref (which will be zeroed below) before returning the + * lock to a client. */ + if (new_lock->l_export == req->rq_export) { + LASSERT(new_lock->l_readers + new_lock->l_writers == 0); + } else { + LASSERT(new_lock->l_export == NULL); + LASSERT(new_lock->l_readers + new_lock->l_writers == 1); + } - if (new_lock->l_export == req->rq_export) { - /* Already gave this to the client, which means that we - * reconstructed a reply. */ - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & - MSG_RESENT); - RETURN(ELDLM_LOCK_REPLACED); - } + *lockp = new_lock; - /* Fixup the lock to be given to the client */ - l_lock(&new_lock->l_resource->lr_namespace->ns_lock); - new_lock->l_readers = 0; - new_lock->l_writers = 0; + if (new_lock->l_export == req->rq_export) { + /* Already gave this to the client, which means that we + * reconstructed a reply. */ + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT); + RETURN(ELDLM_LOCK_REPLACED); + } - new_lock->l_export = class_export_get(req->rq_export); - list_add(&new_lock->l_export_chain, - &new_lock->l_export->exp_ldlm_data.led_held_locks); + /* Fixup the lock to be given to the client */ + l_lock(&new_lock->l_resource->lr_namespace->ns_lock); + new_lock->l_readers = 0; + new_lock->l_writers = 0; - new_lock->l_blocking_ast = lock->l_blocking_ast; - new_lock->l_completion_ast = lock->l_completion_ast; + new_lock->l_export = class_export_get(req->rq_export); + list_add(&new_lock->l_export_chain, + &new_lock->l_export->exp_ldlm_data.led_held_locks); - memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, - sizeof(lock->l_remote_handle)); + new_lock->l_blocking_ast = lock->l_blocking_ast; + new_lock->l_completion_ast = lock->l_completion_ast; - new_lock->l_flags &= ~LDLM_FL_LOCAL; + memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, + sizeof(lock->l_remote_handle)); - LDLM_LOCK_PUT(new_lock); - l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); + new_lock->l_flags &= ~LDLM_FL_LOCAL; - RETURN(ELDLM_LOCK_REPLACED); - } else { - int size = sizeof(struct ldlm_reply); - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - LBUG(); - RETURN(-ENOMEM); - } - } - RETURN(0); + LDLM_LOCK_PUT(new_lock); + l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); + + RETURN(ELDLM_LOCK_REPLACED); } int mds_attach(struct obd_device *dev, obd_count len, void *data) @@ -1938,14 +1929,12 @@ static int __init mds_init(void) class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME); lprocfs_init_multi_vars(1, &lvars); class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME); - ldlm_register_intent(ldlm_intent_policy); return 0; } static void /*__exit*/ mds_exit(void) { - ldlm_unregister_intent(); class_unregister_type(LUSTRE_MDS_NAME); class_unregister_type(LUSTRE_MDT_NAME); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 88724c0..80c89e5 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -740,10 +740,10 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode, if (child_lockh == NULL) child_lockh = &lockh; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - child_res_id, LDLM_PLAIN, NULL, 0, - LCK_EX, &lock_flags, ldlm_completion_ast, - mds_blocking_ast, NULL, child_lockh); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id, + LDLM_PLAIN, NULL, LCK_EX, &lock_flags, + mds_blocking_ast, ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_enqueue: %d\n", rc); else if (child_lockh == &lockh) diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index b44dc22..3227f34 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -667,11 +667,11 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, * makes it out to the client but the unlink's does not. * See bug 2029 for more detail.*/ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - NULL, child_res_id, LDLM_PLAIN, - NULL, 0, LCK_EX, &lock_flags, - ldlm_completion_ast, - mds_blocking_ast, NULL, - &child_ino_lockh); + child_res_id, LDLM_PLAIN, NULL, + LCK_EX, &lock_flags, + mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, &child_ino_lockh); if (rc != ELDLM_OK) { CERROR("error locking for unlink/create sync: " "%d\n", rc); @@ -794,10 +794,10 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, res_id[0]->name[0], res_id[1]->name[0]); flags = LDLM_FL_LOCAL_ONLY; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0], - LDLM_PLAIN, NULL, 0, lock_modes[0], &flags, - ldlm_completion_ast, mds_blocking_ast, NULL, - handles[0]); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], + LDLM_PLAIN, NULL, lock_modes[0], &flags, + mds_blocking_ast, ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) RETURN(-EIO); ldlm_lock_dump_handle(D_OTHER, handles[0]); @@ -807,10 +807,11 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { flags = LDLM_FL_LOCAL_ONLY; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - *res_id[1], LDLM_PLAIN, NULL, 0, - lock_modes[1], &flags,ldlm_completion_ast, - mds_blocking_ast, NULL, handles[1]); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, + *res_id[1], LDLM_PLAIN, NULL, + lock_modes[1], &flags, mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, NULL, 0, + NULL, handles[1]); if (rc != ELDLM_OK) { ldlm_lock_decref(handles[0], lock_modes[0]); RETURN(-EIO); @@ -883,11 +884,11 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, ldlm_lock_addref(dlm_handles[i], lock_modes[i]); } else { rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - NULL, *res_id[i], LDLM_PLAIN, - NULL, 0, lock_modes[i], &flags, - ldlm_completion_ast, - mds_blocking_ast, NULL, - dlm_handles[i]); + *res_id[i], LDLM_PLAIN, NULL, + lock_modes[i], &flags, + mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, dlm_handles[i]); if (rc != ELDLM_OK) GOTO(out_err, rc = -EIO); ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]); @@ -969,9 +970,9 @@ static int mds_verify_child(struct obd_device *obd, } rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - NULL, *child_res_id, LDLM_PLAIN, - NULL, 0, child_mode, &flags, - ldlm_completion_ast, mds_blocking_ast, + *child_res_id, LDLM_PLAIN, NULL, + child_mode, &flags, mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) GOTO(cleanup, rc = -EIO); diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 17beed2..bc171b2 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -647,7 +647,7 @@ static void cleanup_obdclass(void) #ifdef __KERNEL__ #include #define LUSTRE_MIN_VERSION 28 -#define LUSTRE_MAX_VERSION 32 +#define LUSTRE_MAX_VERSION 33 #if (LUSTRE_KERNEL_VERSION < LUSTRE_MIN_VERSION) # error Cannot continue: Your Lustre kernel patch is older than the sources #elif (LUSTRE_KERNEL_VERSION > LUSTRE_MAX_VERSION) diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 4a96820..41af093 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -642,7 +642,6 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats) LPROCFS_OBD_OP_INIT(num_private_stats, stats, san_preprw); LPROCFS_OBD_OP_INIT(num_private_stats, stats, init_export); LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy_export); - LPROCFS_OBD_OP_INIT(num_private_stats, stats, lock_contains); LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_init); LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish); LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin); diff --git a/lustre/obdclass/rbtree.c b/lustre/obdclass/rbtree.c deleted file mode 100644 index 9d393d3..0000000 --- a/lustre/obdclass/rbtree.c +++ /dev/null @@ -1,338 +0,0 @@ -/* - Red Black Trees - (C) 1999 Andrea Arcangeli - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - linux/lib/rbtree.c - - rb_get_first and rb_get_next written by Theodore Ts'o, 9/8/2002 -*/ - -#include -#include - -static void __rb_rotate_left(rb_node_t * node, rb_root_t * root) -{ - rb_node_t * right = node->rb_right; - - if ((node->rb_right = right->rb_left)) - right->rb_left->rb_parent = node; - right->rb_left = node; - - if ((right->rb_parent = node->rb_parent)) - { - if (node == node->rb_parent->rb_left) - node->rb_parent->rb_left = right; - else - node->rb_parent->rb_right = right; - } - else - root->rb_node = right; - node->rb_parent = right; -} - -static void __rb_rotate_right(rb_node_t * node, rb_root_t * root) -{ - rb_node_t * left = node->rb_left; - - if ((node->rb_left = left->rb_right)) - left->rb_right->rb_parent = node; - left->rb_right = node; - - if ((left->rb_parent = node->rb_parent)) - { - if (node == node->rb_parent->rb_right) - node->rb_parent->rb_right = left; - else - node->rb_parent->rb_left = left; - } - else - root->rb_node = left; - node->rb_parent = left; -} - -void rb_insert_color(rb_node_t * node, rb_root_t * root) -{ - rb_node_t * parent, * gparent; - - while ((parent = node->rb_parent) && parent->rb_color == RB_RED) - { - gparent = parent->rb_parent; - - if (parent == gparent->rb_left) - { - { - register rb_node_t * uncle = gparent->rb_right; - if (uncle && uncle->rb_color == RB_RED) - { - uncle->rb_color = RB_BLACK; - parent->rb_color = RB_BLACK; - gparent->rb_color = RB_RED; - node = gparent; - continue; - } - } - - if (parent->rb_right == node) - { - register rb_node_t * tmp; - __rb_rotate_left(parent, root); - tmp = parent; - parent = node; - node = tmp; - } - - parent->rb_color = RB_BLACK; - gparent->rb_color = RB_RED; - __rb_rotate_right(gparent, root); - } else { - { - register rb_node_t * uncle = gparent->rb_left; - if (uncle && uncle->rb_color == RB_RED) - { - uncle->rb_color = RB_BLACK; - parent->rb_color = RB_BLACK; - gparent->rb_color = RB_RED; - node = gparent; - continue; - } - } - - if (parent->rb_left == node) - { - register rb_node_t * tmp; - __rb_rotate_right(parent, root); - tmp = parent; - parent = node; - node = tmp; - } - - parent->rb_color = RB_BLACK; - gparent->rb_color = RB_RED; - __rb_rotate_left(gparent, root); - } - } - - root->rb_node->rb_color = RB_BLACK; -} -EXPORT_SYMBOL(rb_insert_color); - -static void __rb_erase_color(rb_node_t * node, rb_node_t * parent, - rb_root_t * root) -{ - rb_node_t * other; - - while ((!node || node->rb_color == RB_BLACK) && node != root->rb_node) - { - if (parent->rb_left == node) - { - other = parent->rb_right; - if (other->rb_color == RB_RED) - { - other->rb_color = RB_BLACK; - parent->rb_color = RB_RED; - __rb_rotate_left(parent, root); - other = parent->rb_right; - } - if ((!other->rb_left || - other->rb_left->rb_color == RB_BLACK) - && (!other->rb_right || - other->rb_right->rb_color == RB_BLACK)) - { - other->rb_color = RB_RED; - node = parent; - parent = node->rb_parent; - } - else - { - if (!other->rb_right || - other->rb_right->rb_color == RB_BLACK) - { - register rb_node_t * o_left; - if ((o_left = other->rb_left)) - o_left->rb_color = RB_BLACK; - other->rb_color = RB_RED; - __rb_rotate_right(other, root); - other = parent->rb_right; - } - other->rb_color = parent->rb_color; - parent->rb_color = RB_BLACK; - if (other->rb_right) - other->rb_right->rb_color = RB_BLACK; - __rb_rotate_left(parent, root); - node = root->rb_node; - break; - } - } - else - { - other = parent->rb_left; - if (other->rb_color == RB_RED) - { - other->rb_color = RB_BLACK; - parent->rb_color = RB_RED; - __rb_rotate_right(parent, root); - other = parent->rb_left; - } - if ((!other->rb_left || - other->rb_left->rb_color == RB_BLACK) - && (!other->rb_right || - other->rb_right->rb_color == RB_BLACK)) - { - other->rb_color = RB_RED; - node = parent; - parent = node->rb_parent; - } - else - { - if (!other->rb_left || - other->rb_left->rb_color == RB_BLACK) - { - register rb_node_t * o_right; - if ((o_right = other->rb_right)) - o_right->rb_color = RB_BLACK; - other->rb_color = RB_RED; - __rb_rotate_left(other, root); - other = parent->rb_left; - } - other->rb_color = parent->rb_color; - parent->rb_color = RB_BLACK; - if (other->rb_left) - other->rb_left->rb_color = RB_BLACK; - __rb_rotate_right(parent, root); - node = root->rb_node; - break; - } - } - } - if (node) - node->rb_color = RB_BLACK; -} - -void rb_erase(rb_node_t * node, rb_root_t * root) -{ - rb_node_t * child, * parent; - int color; - - if (!node->rb_left) - child = node->rb_right; - else if (!node->rb_right) - child = node->rb_left; - else - { - rb_node_t * old = node, * left; - - node = node->rb_right; - while ((left = node->rb_left)) - node = left; - child = node->rb_right; - parent = node->rb_parent; - color = node->rb_color; - - if (child) - child->rb_parent = parent; - if (parent) - { - if (parent->rb_left == node) - parent->rb_left = child; - else - parent->rb_right = child; - } - else - root->rb_node = child; - - if (node->rb_parent == old) - parent = node; - node->rb_parent = old->rb_parent; - node->rb_color = old->rb_color; - node->rb_right = old->rb_right; - node->rb_left = old->rb_left; - - if (old->rb_parent) - { - if (old->rb_parent->rb_left == old) - old->rb_parent->rb_left = node; - else - old->rb_parent->rb_right = node; - } else - root->rb_node = node; - - old->rb_left->rb_parent = node; - if (old->rb_right) - old->rb_right->rb_parent = node; - goto color; - } - - parent = node->rb_parent; - color = node->rb_color; - - if (child) - child->rb_parent = parent; - if (parent) - { - if (parent->rb_left == node) - parent->rb_left = child; - else - parent->rb_right = child; - } - else - root->rb_node = child; - - color: - if (color == RB_BLACK) - __rb_erase_color(child, parent, root); -} -EXPORT_SYMBOL(rb_erase); - -/* - * This function returns the first node (in sort order) of the tree. - */ -rb_node_t *rb_get_first(rb_root_t *root) -{ - rb_node_t *n; - - n = root->rb_node; - if (!n) - return 0; - while (n->rb_left) - n = n->rb_left; - return n; -} -EXPORT_SYMBOL(rb_get_first); - -/* - * Given a node, this function will return the next node in the tree. - */ -rb_node_t *rb_get_next(rb_node_t *n) -{ - rb_node_t *parent; - - if (n->rb_right) { - n = n->rb_right; - while (n->rb_left) - n = n->rb_left; - return n; - } else { - while ((parent = n->rb_parent)) { - if (n == parent->rb_left) - return parent; - n = parent; - } - return 0; - } -} -EXPORT_SYMBOL(rb_get_next); - diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 2f15e62..69862b7 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1078,14 +1078,15 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa, ecl->ecl_mode = mode; ecl->ecl_object = eco; - ecl->ecl_extent.start = offset; - ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); + ecl->ecl_policy.l_extent.start = offset; + ecl->ecl_policy.l_extent.end = + (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); flags = 0; - rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, NULL, LDLM_EXTENT, - &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode, - &flags, echo_ldlm_callback, eco, - &ecl->ecl_lock_handle); + rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT, + &ecl->ecl_policy, mode, &flags, echo_ldlm_callback, + ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb), + lustre_swab_ost_lvb, &ecl->ecl_lock_handle); if (rc != 0) goto failed_1; diff --git a/lustre/obdfilter/Makefile.am b/lustre/obdfilter/Makefile.am index fca412b..18fd5f3 100644 --- a/lustre/obdfilter/Makefile.am +++ b/lustre/obdfilter/Makefile.am @@ -7,6 +7,6 @@ MODULE = obdfilter modulefs_DATA = obdfilter.o EXTRA_PROGRAMS = obdfilter obdfilter_SOURCES = filter.c filter_io.c filter_log.c filter_san.c \ - filter_io_24.c lproc_obdfilter.c filter_internal.h + filter_io_24.c lproc_obdfilter.c filter_internal.h filter_lvb.c include $(top_srcdir)/Rules diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index d576705..1cc0a6e 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -902,29 +902,16 @@ static int filter_blocking_ast(struct ldlm_lock *lock, RETURN(0); } -static int filter_lock_dentry(struct obd_device *obd, struct dentry *de, - ldlm_mode_t lock_mode,struct lustre_handle *lockh) +static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) { - struct ldlm_res_id res_id = { .name = {0} }; - int flags = 0, rc; - ENTRY; - - res_id.name[0] = de->d_inode->i_ino; - res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, - filter_blocking_ast, NULL, lockh); - - RETURN(rc == ELDLM_OK ? 0 : -EIO); /* XXX translate ldlm code */ + down(&dparent->d_inode->i_sem); + return 0; } /* We never dget the object parent, so DON'T dput it either */ -static void filter_parent_unlock(struct dentry *dparent, - struct lustre_handle *lockh, - ldlm_mode_t lock_mode) +static void filter_parent_unlock(struct dentry *dparent) { - ldlm_lock_decref(lockh, lock_mode); + up(&dparent->d_inode->i_sem); } /* We never dget the object parent, so DON'T dput it either */ @@ -941,20 +928,19 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) /* We never dget the object parent, so DON'T dput it either */ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group, - obd_id objid, ldlm_mode_t lock_mode, - struct lustre_handle *lockh) + obd_id objid) { unsigned long now = jiffies; - struct dentry *de = filter_parent(obd, group, objid); + struct dentry *dparent = filter_parent(obd, group, objid); int rc; - if (IS_ERR(de)) - return de; + if (IS_ERR(dparent)) + return dparent; - rc = filter_lock_dentry(obd, de, lock_mode, lockh); + rc = filter_lock_dentry(obd, dparent); if (time_after(jiffies, now + 15 * HZ)) CERROR("slow parent lock %lus\n", (jiffies - now) / HZ); - return rc ? ERR_PTR(rc) : de; + return rc ? ERR_PTR(rc) : dparent; } /* How to get files, dentries, inodes from object id's. @@ -968,7 +954,6 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, struct dentry *dir_dentry, obd_gr group, obd_id id) { - struct lustre_handle lockh; struct dentry *dparent = dir_dentry; struct dentry *dchild; char name[32]; @@ -982,15 +967,15 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, len = sprintf(name, LPU64, id); if (dir_dentry == NULL) { - dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh); + dparent = filter_parent_lock(obd, group, id); if (IS_ERR(dparent)) RETURN(dparent); } CDEBUG(D_INODE, "looking up object O/%*s/%s\n", dparent->d_name.len, dparent->d_name.name, name); - dchild = ll_lookup_one_len(name, dparent, len); + dchild = /*ll_*/lookup_one_len(name, dparent, len); if (dir_dentry == NULL) - filter_parent_unlock(dparent, &lockh, LCK_PR); + filter_parent_unlock(dparent); if (IS_ERR(dchild)) { CERROR("child lookup error %ld\n", PTR_ERR(dchild)); RETURN(dchild); @@ -1009,21 +994,15 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) struct lustre_handle lockh; int flags = LDLM_AST_DISCARD_DATA, rc; struct ldlm_res_id res_id = { .name = { objid } }; - struct ldlm_extent extent = { 0, OBD_OBJECT_EOF }; - ENTRY; + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + ENTRY; /* Tell the clients that the object is gone now and that they should - * throw away any cached pages. If we're the OST at stripe 0 in the - * file then this enqueue will communicate the DISCARD to all the - * clients. This assumes that we always destroy all the objects for - * a file at a time, as is currently the case. If we're not the - * OST at stripe 0 then we'll harmlessly get a very lonely lock in - * the local DLM and immediately drop it. */ - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - res_id, LDLM_EXTENT, &extent, - sizeof(extent), LCK_PW, &flags, - ldlm_completion_ast, filter_blocking_ast, - NULL, &lockh); + * throw away any cached pages. */ + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, + LDLM_EXTENT, &policy, LCK_PW, + &flags, filter_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) @@ -1058,6 +1037,119 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid, RETURN(rc); } +static int filter_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) +{ + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + struct ptlrpc_request *req = req_cookie; + struct ldlm_lock *lock = *lockp, *l = NULL; + struct ldlm_resource *res = lock->l_resource; + ldlm_processing_policy policy; + struct ost_lvb *res_lvb, *reply_lvb; + struct list_head *tmp; + ldlm_error_t err; + int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply), + sizeof(struct ost_lvb) }; + ENTRY; + + policy = ldlm_get_processing_policy(res); + LASSERT(policy != NULL); + LASSERT(req != NULL); + + rc = lustre_pack_reply(req, 2, repsize, NULL); + if (rc) + RETURN(req->rq_status = rc); + + reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb)); + LASSERT(reply_lvb != NULL); + + //fixup_handle_for_resent_req(req, lock, &lockh); + + /* If we grant any lock at all, it will be a whole-file read lock. + * Call the extent policy function to see if our request can be + * granted, or is blocked. */ + lock->l_policy_data.l_extent.start = 0; + lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; + lock->l_req_mode = LCK_PR; + + l_lock(&res->lr_namespace->ns_lock); + + res->lr_tmp = &rpc_list; + rc = policy(lock, &tmpflags, 0, &err); + res->lr_tmp = NULL; + + /* FIXME: we should change the policy function slightly, to not make + * this list at all, since we just turn around and free it */ + while (!list_empty(&rpc_list)) { + struct ldlm_ast_work *w = + list_entry(rpc_list.next, struct ldlm_ast_work, w_list); + list_del(&w->w_list); + LDLM_LOCK_PUT(w->w_lock); + OBD_FREE(w, sizeof(*w)); + } + + if (rc == LDLM_ITER_CONTINUE) { + /* The lock met with no resistance; we're finished. */ + l_unlock(&res->lr_namespace->ns_lock); + RETURN(ELDLM_LOCK_REPLACED); + } + + /* Do not grant any lock, but instead send GL callbacks. The extent + * policy nicely created a list of all PW locks for us. We will choose + * the highest of those which are larger than the size in the LVB, if + * any, and perform a glimpse callback. */ + down(&res->lr_lvb_sem); + res_lvb = res->lr_lvb_data; + LASSERT(res_lvb != NULL); + reply_lvb->lvb_size = res_lvb->lvb_size; + up(&res->lr_lvb_sem); + + list_for_each(tmp, &res->lr_granted) { + struct ldlm_lock *tmplock = + list_entry(tmp, struct ldlm_lock, l_res_link); + + if (tmplock->l_granted_mode == LCK_PR) + continue; + + if (tmplock->l_policy_data.l_extent.end <= + reply_lvb->lvb_size) + continue; + + if (l == NULL) { + l = LDLM_LOCK_GET(tmplock); + continue; + } + + if (l->l_policy_data.l_extent.start > + tmplock->l_policy_data.l_extent.start) + continue; + + LDLM_LOCK_PUT(l); + l = LDLM_LOCK_GET(tmplock); + } + l_unlock(&res->lr_namespace->ns_lock); + + /* There were no PW locks beyond the size in the LVB; finished. */ + if (l == NULL) + RETURN(ELDLM_LOCK_ABORTED); + + LASSERT(l->l_glimpse_ast != NULL); + rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + + down(&res->lr_lvb_sem); +#if 0 + if (res_lvb->lvb_size == reply_lvb->lvb_size) + LDLM_ERROR(l, "we lost the glimpse race!"); +#endif + reply_lvb->lvb_size = res_lvb->lvb_size; + up(&res->lr_lvb_sem); + + LDLM_LOCK_PUT(l); + + RETURN(ELDLM_LOCK_ABORTED); +} + /* mount the file system (secretly) */ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, char *option) @@ -1129,6 +1221,9 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, LDLM_NAMESPACE_SERVER); if (obd->obd_namespace == NULL) GOTO(err_post, rc = -ENOMEM); + obd->obd_namespace->ns_lvbp = obd; + obd->obd_namespace->ns_lvbo = &filter_lvbo; + ldlm_register_intent(obd->obd_namespace, filter_intent_policy); ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "filter_ldlm_cb_client", &obd->obd_ldlm_client); @@ -1448,6 +1543,8 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa, struct filter_obd *filter; struct dentry *dentry; struct iattr iattr; + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ldlm_resource *res; void *handle; int rc, rc2; ENTRY; @@ -1487,6 +1584,22 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa, rc = rc2; } + if (iattr.ia_valid & ATTR_SIZE) { + res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL, + res_id, LDLM_EXTENT, 0); + if (res == NULL) { + CERROR("!!! resource_get failed for object "LPU64" -- " + "filter_setattr with no lock?\n", oa->o_id); + } else { + if (res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + rc = res->lr_namespace->ns_lvbo->lvbo_update + (res, NULL, 0); + } + ldlm_resource_putref(res); + } + } + oa->o_valid = OBD_MD_FLID; obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); @@ -1625,7 +1738,6 @@ static int filter_should_precreate(struct obd_export *exp, struct obdo *oa, static int filter_precreate(struct obd_device *obd, struct obdo *oa, obd_gr group, int *num) { - struct lustre_handle parent_lockh; struct dentry *dchild = NULL; struct filter_obd *filter; struct dentry *dparent; @@ -1660,8 +1772,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id); - dparent = filter_parent_lock(obd, group, next_id, LCK_PW, - &parent_lockh); + dparent = filter_parent_lock(obd, group, next_id); if (IS_ERR(dparent)) GOTO(cleanup, rc = PTR_ERR(dparent)); cleanup_phase = 1; @@ -1720,7 +1831,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, case 2: f_dput(dchild); case 1: - filter_parent_unlock(dparent, &parent_lockh, LCK_PW); + filter_parent_unlock(dparent); case 0: break; } @@ -1798,7 +1909,6 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa, struct dentry *dchild = NULL, *dparent = NULL; struct obd_run_ctxt saved; void *handle = NULL; - struct lustre_handle parent_lockh; struct llog_cookie *fcc = NULL; int rc, rc2, cleanup_phase = 0, have_prepared = 0; obd_gr group = 0; @@ -1813,8 +1923,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa, push_ctxt(&saved, &obd->obd_ctxt, NULL); acquire_locks: - dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW, - &parent_lockh); + dparent = filter_parent_lock(obd, group, oa->o_id); if (IS_ERR(dparent)) GOTO(cleanup, rc = PTR_ERR(dparent)); cleanup_phase = 1; @@ -1844,7 +1953,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa, * complication of condition the above code to skip it on the * second time through. */ f_dput(dchild); - filter_parent_unlock(dparent, &parent_lockh, LCK_PW); + filter_parent_unlock(dparent); filter_prepare_destroy(obd, oa->o_id); have_prepared = 1; @@ -1869,13 +1978,14 @@ cleanup: switch(cleanup_phase) { case 3: if (fcc != NULL) { - if (oti != NULL) { + if (oti != NULL) fsfilt_add_journal_cb(obd, 0, oti->oti_handle, - filter_cancel_cookies_cb, fcc); - } else { + filter_cancel_cookies_cb, + fcc); + else fsfilt_add_journal_cb(obd, 0, handle, - filter_cancel_cookies_cb, fcc); - } + filter_cancel_cookies_cb, + fcc); } rc = filter_finish_transno(exp, oti, rc); rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0); @@ -1887,13 +1997,7 @@ cleanup: case 2: f_dput(dchild); case 1: - if (rc || oti == NULL) { - filter_parent_unlock(dparent, &parent_lockh, LCK_PW); - } else { - memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh, - sizeof(parent_lockh)); - oti->oti_ack_locks[0].mode = LCK_PW; - } + filter_parent_unlock(dparent); case 0: pop_ctxt(&saved, &obd->obd_ctxt, NULL); break; diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 06d852c..93379d8 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -95,9 +95,6 @@ enum { #define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF /* filter.c */ -struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid); -struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id, - ldlm_mode_t, struct lustre_handle *); void f_dput(struct dentry *); struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir, obd_gr group, obd_id id); @@ -114,6 +111,10 @@ int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, obd_count len, void *buf, char *option); +/* filter_lvb.c */ +extern struct ldlm_valblock_ops filter_lvbo; + + /* filter_io.c */ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount, struct obd_ioobj *, int niocount, struct niobuf_remote *, diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c new file mode 100644 index 0000000..254a3fb --- /dev/null +++ b/lustre/obdfilter/filter_lvb.c @@ -0,0 +1,189 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * linux/fs/obdfilter/filter_log.c + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include + +#include +#include +#include + +#include "filter_internal.h" + +static int filter_lvbo_init(struct ldlm_resource *res) +{ + int rc = 0; + struct ost_lvb *lvb = NULL; + struct obd_device *obd; + struct obdo *oa = NULL; + struct dentry *dentry; + ENTRY; + + LASSERT(res); + + /* we only want lvb's for object resources */ + /* check for internal locks: these have name[1] != 0 */ + if (res->lr_name.name[1]) + RETURN(0); + + down(&res->lr_lvb_sem); + if (res->lr_lvb_data) + GOTO(out, rc = 0); + + OBD_ALLOC(lvb, sizeof(*lvb)); + if (lvb == NULL) + GOTO(out, rc = -ENOMEM); + + res->lr_lvb_data = lvb; + res->lr_lvb_len = sizeof(*lvb); + + obd = res->lr_namespace->ns_lvbp; + LASSERT(obd != NULL); + + oa = obdo_alloc(); + if (oa == NULL) + GOTO(out, rc = -ENOMEM); + + oa->o_id = res->lr_name.name[0]; + oa->o_gr = 0; + dentry = filter_oa2dentry(obd, oa); + if (IS_ERR(dentry)) + GOTO(out, rc = PTR_ERR(dentry)); + + /* Limit the valid bits in the return data to what we actually use */ + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); + f_dput(dentry); + + lvb->lvb_size = dentry->d_inode->i_size; + lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime); + + out: + if (oa) + obdo_free(oa); + if (rc && lvb != NULL) { + OBD_FREE(lvb, sizeof(*lvb)); + res->lr_lvb_data = NULL; + res->lr_lvb_len = 0; + } + up(&res->lr_lvb_sem); + return rc; +} + +/* This will be called in two ways: + * + * m != NULL : called by the DLM itself after a glimpse callback + * m == NULL : called by the filter after a disk write + */ +static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, + int buf_idx) +{ + int rc = 0; + struct ost_lvb *lvb = res->lr_lvb_data; + struct obd_device *obd; + struct obdo *oa = NULL; + struct dentry *dentry; + ENTRY; + + LASSERT(res); + + /* we only want lvb's for object resources */ + /* check for internal locks: these have name[1] != 0 */ + if (res->lr_name.name[1]) + RETURN(0); + + down(&res->lr_lvb_sem); + if (!res->lr_lvb_data) { + CERROR("No lvb when running lvbo_update!\n"); + GOTO(out, rc = 0); + } + + /* Update the LVB from the network message */ + if (m != NULL) { + struct ost_lvb *new; + + new = lustre_swab_buf(m, buf_idx, sizeof(*new), + lustre_swab_ost_lvb); + if (new == NULL) { + CERROR("lustre_swab_buf failed\n"); + //GOTO(out, rc = -EPROTO); + GOTO(out, rc = 0); + } + if (new->lvb_size > lvb->lvb_size) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_size, new->lvb_size); + lvb->lvb_size = new->lvb_size; + } + if (new->lvb_time > lvb->lvb_time) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb time: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_time, new->lvb_time); + lvb->lvb_time = new->lvb_time; + } + GOTO(out, rc = 0); + } + + /* Update the LVB from the disk inode */ + obd = res->lr_namespace->ns_lvbp; + LASSERT(obd); + + oa = obdo_alloc(); + if (oa == NULL) + GOTO(out, rc = -ENOMEM); + + oa->o_id = res->lr_name.name[0]; + oa->o_gr = 0; + dentry = filter_oa2dentry(obd, oa); + if (IS_ERR(dentry)) + GOTO(out, rc = PTR_ERR(dentry)); + + /* Limit the valid bits in the return data to what we actually use */ + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); + + lvb->lvb_size = dentry->d_inode->i_size; + lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime); + CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", time: " + LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_time); + f_dput(dentry); + + out: + if (oa != NULL) + obdo_free(oa); + up(&res->lr_lvb_sem); + return rc; +} + + + +struct ldlm_valblock_ops filter_lvbo = { + lvbo_init: filter_lvbo_init, + lvbo_update: filter_lvbo_update +}; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 38cba81..3635a7b 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -593,6 +593,7 @@ void osc_wake_cache_waiters(struct client_obd *cli) } else { osc_consume_write_grant(cli, ocw->ocw_oap); } + wake_up(&ocw->ocw_waitq); } @@ -1700,7 +1701,6 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, if (cli->cl_dirty_max < PAGE_SIZE) return(-EDQUOT); - /* Hopefully normal case - cache space and write credits available */ if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max && cli->cl_avail_grant >= PAGE_SIZE) { @@ -2370,27 +2370,32 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, } static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lustre_handle *parent_lock, - __u32 type, void *extentp, int extent_len, __u32 mode, - int *flags, void *callback, void *data, + __u32 type, ldlm_policy_data_t *policy, __u32 mode, + int *flags, void *bl_cb, void *cp_cb, void *gl_cb, + void *data, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh) { struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; struct obd_device *obd = exp->exp_obd; - struct ldlm_extent *extent = extentp; + struct ost_lvb lvb; int rc; ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - extent->start -= extent->start & ~PAGE_MASK; - extent->end |= ~PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, - type, extent, sizeof(*extent), mode, lockh); + rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode, + lockh); if (rc == 1) { osc_set_data_with_check(lockh, data); + if (*flags & LDLM_FL_HAS_INTENT) { + /* I would like to be able to ASSERT here that rss <= + * kms, but I can't, for reasons which are explained in + * lov_enqueue() */ + } /* We already have a lock, and it's referenced */ RETURN(ELDLM_OK); } @@ -2409,7 +2414,7 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, if (mode == LCK_PR) { rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, - extent, sizeof(*extent), LCK_PW, lockh); + policy, LCK_PW, lockh); if (rc == 1) { /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to @@ -2421,19 +2426,22 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, } } - rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, parent_lock, - res_id, type, extent, sizeof(*extent), mode, - flags,ldlm_completion_ast, callback, data, lockh); + rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type, + policy, mode, flags, bl_cb, cp_cb, gl_cb, data, + &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh); + + if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) + lsm->lsm_oinfo->loi_rss = lvb.lvb_size; + RETURN(rc); } static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, - __u32 type, void *extentp, int extent_len, __u32 mode, + __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; struct obd_device *obd = exp->exp_obd; - struct ldlm_extent *extent = extentp; int rc; ENTRY; @@ -2441,12 +2449,12 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother */ - extent->start -= extent->start & ~PAGE_MASK; - extent->end |= ~PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, - extent, sizeof(*extent), mode, lockh); + policy, mode, lockh); if (rc) { osc_set_data_with_check(lockh, data); RETURN(rc); @@ -2456,7 +2464,7 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, * writers can share a single PW lock. */ if (mode == LCK_PR) { rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, - extent, sizeof(*extent), LCK_PW, lockh); + policy, LCK_PW, lockh); if (rc == 1) { /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to @@ -2846,19 +2854,6 @@ static int osc_disconnect(struct obd_export *exp, int flags) return rc; } -static int osc_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm, - struct ldlm_lock *lock, obd_off offset) -{ - ENTRY; - if (exp == NULL) - RETURN(-ENODEV); - - if (lock->l_policy_data.l_extent.start <= offset && - lock->l_policy_data.l_extent.end >= offset) - RETURN(1); - RETURN(0); -} - static int osc_invalidate_import(struct obd_device *obd, struct obd_import *imp) { @@ -2937,7 +2932,6 @@ struct obd_ops osc_obd_ops = { o_iocontrol: osc_iocontrol, o_get_info: osc_get_info, o_set_info: osc_set_info, - o_lock_contains:osc_lock_contains, o_invalidate_import: osc_invalidate_import, o_llog_init: osc_llog_init, o_llog_finish: osc_llog_finish, @@ -2969,7 +2963,6 @@ struct obd_ops sanosc_obd_ops = { o_cancel: osc_cancel, o_cancel_unused:osc_cancel_unused, o_iocontrol: osc_iocontrol, - o_lock_contains:osc_lock_contains, o_invalidate_import: osc_invalidate_import, o_llog_init: osc_llog_init, o_llog_finish: osc_llog_finish, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index f5c5579..fe6a6da 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1058,7 +1058,8 @@ static int ost_handle(struct ptlrpc_request *req) CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, - ldlm_server_blocking_ast); + ldlm_server_blocking_ast, + ldlm_server_glimpse_ast); fail = OBD_FAIL_OST_LDLM_REPLY_NET; break; case LDLM_CONVERT: diff --git a/lustre/portals/libcfs/debug.c b/lustre/portals/libcfs/debug.c index 7ad9327..e98779f 100644 --- a/lustre/portals/libcfs/debug.c +++ b/lustre/portals/libcfs/debug.c @@ -945,9 +945,6 @@ char *portals_nid2str(int nal, ptl_nid_t nid, char *str) } #ifdef __KERNEL__ -#include -#if (LUSTRE_KERNEL_VERSION >= 30) -#warning "FIXME: remove workaround when l30 is widely used" char stack_backtrace[LUSTRE_TRACE_SIZE]; spinlock_t stack_backtrace_lock = SPIN_LOCK_UNLOCKED; @@ -958,7 +955,7 @@ extern int is_kernel_text_address(unsigned long addr); char *portals_debug_dumpstack(void) { asm("int $3"); - return "dump stack"; + return "dump stack\n"; } #elif defined(__i386__) @@ -1020,7 +1017,6 @@ char *portals_debug_dumpstack(void) #endif /* __arch_um__ */ EXPORT_SYMBOL(stack_backtrace_lock); EXPORT_SYMBOL(portals_debug_dumpstack); -#endif /* LUSTRE_KERNEL_VERSION < 30 */ #endif /* __KERNEL__ */ EXPORT_SYMBOL(portals_debug_dumplog); diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index f783ebf..0075860 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -69,6 +69,7 @@ struct ll_rpc_opcode { { LDLM_CANCEL, "ldlm_cancel" }, { LDLM_BL_CALLBACK, "ldlm_bl_callback" }, { LDLM_CP_CALLBACK, "ldlm_cp_callback" }, + { LDLM_GL_CALLBACK, "ldlm_gl_callback" }, { PTLBD_QUERY, "ptlbd_query" }, { PTLBD_READ, "ptlbd_read" }, { PTLBD_WRITE, "ptlbd_write" }, diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 16ca32a..c735d01 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -318,40 +318,34 @@ char *lustre_msg_string (struct lustre_msg *m, int index, int max_len) return (str); } -/* Wrap up the normal fixed length case */ -void *lustre_swab_reqbuf (struct ptlrpc_request *req, int index, int min_size, - void *swabber) +/* Wrap up the normal fixed length cases */ +void *lustre_swab_buf(struct lustre_msg *msg, int index, int min_size, + void *swabber) { void *ptr; - LASSERT_REQSWAB(req, index); - - ptr = lustre_msg_buf(req->rq_reqmsg, index, min_size); + ptr = lustre_msg_buf(msg, index, min_size); if (ptr == NULL) return NULL; - if (swabber != NULL && lustre_msg_swabbed(req->rq_reqmsg)) + if (swabber != NULL && lustre_msg_swabbed(msg)) ((void (*)(void *))swabber)(ptr); return ptr; } -/* Wrap up the normal fixed length case */ -void *lustre_swab_repbuf (struct ptlrpc_request *req, int index, int min_size, - void *swabber) +void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, + void *swabber) { - void *ptr; + LASSERT_REQSWAB(req, index); + return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber); +} +void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size, + void *swabber) +{ LASSERT_REPSWAB(req, index); - - ptr = lustre_msg_buf(req->rq_repmsg, index, min_size); - if (ptr == NULL) - return NULL; - - if (swabber != NULL && lustre_msg_swabbed(req->rq_repmsg)) - ((void (*)(void *))swabber)(ptr); - - return ptr; + return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber); } /* byte flipping routines for all wire types declared in @@ -419,6 +413,12 @@ void lustre_swab_ost_last_id(obd_id *id) __swab64s(id); } +void lustre_swab_ost_lvb(struct ost_lvb *lvb) +{ + __swab64s(&lvb->lvb_size); + __swab64s(&lvb->lvb_time); +} + void lustre_swab_ll_fid (struct ll_fid *fid) { __swab64s (&fid->id); @@ -564,24 +564,16 @@ void lustre_swab_ldlm_intent (struct ldlm_intent *i) void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r) { - int i; - __swab32s (&r->lr_type); lustre_swab_ldlm_res_id (&r->lr_name); - for (i = 0; i < RES_VERSION_SIZE; i++) - __swab32s (&r->lr_version[i]); } void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l) { - int i; - lustre_swab_ldlm_resource_desc (&l->l_resource); __swab32s (&l->l_req_mode); __swab32s (&l->l_granted_mode); lustre_swab_ldlm_policy_data (&l->l_policy_data); - for (i = 0; i < RES_VERSION_SIZE; i++) - __swab32s (&l->l_version[i]); } void lustre_swab_ldlm_request (struct ldlm_request *rq) @@ -595,10 +587,8 @@ void lustre_swab_ldlm_request (struct ldlm_request *rq) void lustre_swab_ldlm_reply (struct ldlm_reply *r) { __swab32s (&r->lock_flags); - __swab32s (&r->lock_mode); - lustre_swab_ldlm_res_id (&r->lock_resource_name); + lustre_swab_ldlm_lock_desc (&r->lock_desc); /* lock_handle opaque */ - lustre_swab_ldlm_policy_data (&r->lock_policy_data); __swab64s (&r->lock_policy_res1); __swab64s (&r->lock_policy_res2); } @@ -1199,13 +1189,11 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct ldlm_intent *)0)->opc) == 8); /* Checks for struct ldlm_resource_desc */ - LASSERT((int)sizeof(struct ldlm_resource_desc) == 52); + LASSERT((int)sizeof(struct ldlm_resource_desc) == 40); LASSERT(offsetof(struct ldlm_resource_desc, lr_type) == 0); LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_type) == 4); - LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 4); + LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 8); LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_name) == 32); - LASSERT(offsetof(struct ldlm_resource_desc, lr_version[4]) == 52); - LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_version[4]) == 4); /* Checks for struct ldlm_lock_desc */ LASSERT((int)sizeof(struct ldlm_lock_desc) == 108); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 519b434..87b385d 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -144,6 +144,7 @@ EXPORT_SYMBOL(lustre_msg_size); EXPORT_SYMBOL(lustre_unpack_msg); EXPORT_SYMBOL(lustre_msg_buf); EXPORT_SYMBOL(lustre_msg_string); +EXPORT_SYMBOL(lustre_swab_buf); EXPORT_SYMBOL(lustre_swab_reqbuf); EXPORT_SYMBOL(lustre_swab_repbuf); EXPORT_SYMBOL(lustre_swab_obdo); @@ -152,6 +153,7 @@ EXPORT_SYMBOL(lustre_swab_obd_ioobj); EXPORT_SYMBOL(lustre_swab_niobuf_remote); EXPORT_SYMBOL(lustre_swab_ost_body); EXPORT_SYMBOL(lustre_swab_ost_last_id); +EXPORT_SYMBOL(lustre_swab_ost_lvb); EXPORT_SYMBOL(lustre_swab_ll_fid); EXPORT_SYMBOL(lustre_swab_mds_status_req); EXPORT_SYMBOL(lustre_swab_mds_body); diff --git a/lustre/scripts/lustre.spec.in b/lustre/scripts/lustre.spec.in index a1a5e49..7003435 100644 --- a/lustre/scripts/lustre.spec.in +++ b/lustre/scripts/lustre.spec.in @@ -1,5 +1,5 @@ # lustre.spec -%define version HEAD +%define version b_size %define kversion @LINUXRELEASE@ %define linuxdir @LINUX@ %define enable_doc @ENABLE_DOC@ diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index ec166e4..f4a7f22 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -1277,7 +1277,7 @@ run_test 42b "test destroy of file with cached dirty data ======" # any number of fixes (don't get {0,EOF} on open, match # composite locks, do smarter file size management) fix # this, but for now we want these tests to verify that -# the cancelation with truncate intent works, so we +# the cancellation with truncate intent works, so we # start the file with a full-file pw lock to match against # until the truncate. trunc_test() { @@ -1318,7 +1318,7 @@ run_test 42d "test complete truncate of file with cached dirty data" test_43() { mkdir $DIR/d43 cp -p /bin/ls $DIR/d43/f - exec 100>> $DIR/d43/f + exec 100>> $DIR/d43/f $DIR/d43/f && error || true exec 100<&- } @@ -1327,8 +1327,7 @@ run_test 43 "execution of file opened for write should return -ETXTBSY" test_43a() { mkdir -p $DIR/d43 cp -p `which multiop` $DIR/d43/multiop - touch $DIR/d43/g - $DIR/d43/multiop $DIR/d43/g o_c & + $DIR/d43/multiop $TMP/test43.junk O_c & MULTIPID=$! sleep 1 multiop $DIR/d43/multiop Oc && error "expected error, got success" @@ -1340,8 +1339,7 @@ run_test 43a "open(RDWR) of file being executed should return -ETXTBSY" test_43b() { mkdir -p $DIR/d43 cp -p `which multiop` $DIR/d43/multiop - touch $DIR/d43/g - $DIR/d43/multiop $DIR/d43/g o_c & + $DIR/d43/multiop $TMP/test43.junk O_c & MULTIPID=$! sleep 1 truncate $DIR/d43/multiop 0 && error "expected error, got success" @@ -1429,7 +1427,7 @@ test_45() { do_dirty_record "echo blah > $f" [ $before -eq $after ] && error "write wasn't cached" do_dirty_record "cancel_lru_locks OSC" - [ $before -gt $after ] || error "lock cancelation didn't lower dirty count" + [ $before -gt $after ] || error "lock cancellation didn't lower dirty count" start_kupdated } run_test 45 "osc io page accounting ============================" @@ -1711,7 +1709,6 @@ test_62() { cancel_lru_locks OSC echo 0x405 > /proc/sys/lustre/fail_loc cat $f && error "cat succeeded, expect -EIO" - multiop $f Owc && error "multiop succeeded, expect -EIO" echo 0 > /proc/sys/lustre/fail_loc } run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)" diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 37f3c96..6c8172b 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -284,11 +284,44 @@ test_13() { # bug 2451 - directory coherency } run_test 13 "test directory page revocation ====================" -test_14() { # bug 974 - ENOSPC +test_14() { + mkdir $DIR1/d14 + cp -p /bin/ls $DIR1/d14/ls + exec 100>> $DIR1/d14/ls + $DIR2/d14/ls && error || true + exec 100<&- +} +run_test 14 "execution of file opened for write should return -ETXTBSY==" + +test_14a() { + mkdir -p $DIR1/d14 + cp -p `which multiop` $DIR1/d14/multiop + $DIR1/d14/multiop $TMP/test14.junk O_c & + MULTIPID=$! + sleep 1 + multiop $DIR2/d14/multiop Oc && error "expected error, got success" + kill -USR1 $MULTIPID || return 2 + wait $MULTIPID || return 3 +} +run_test 14a "open(RDWR) of file being executed should return -ETXTBSY" + +test_14b() { + mkdir -p $DIR1/d14 + cp -p `which multiop` $DIR1/d14/multiop + $DIR1/d14/multiop $TMP/test14.junk O_c & + MULTIPID=$! + sleep 1 + truncate $DIR2/d14/multiop 0 && error "expected error, got success" + kill -USR1 $MULTIPID || return 2 + wait $MULTIPID || return 3 +} +run_test 14b "truncate of file being executed should return -ETXTBSY" + +test_15() { # bug 974 - ENOSPC env sh oos2.sh $MOUNT1 $MOUNT2 } -run_test 14 "test out-of-space with multiple writers ===========" +run_test 15 "test out-of-space with multiple writers ===========" log "cleanup: ======================================================" rm -rf $DIR1/[df][0-9]* $DIR1/lnk || true diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 8beb802..d33fc7a 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -431,7 +431,6 @@ check_ldlm_resource_desc(void) CHECK_STRUCT(ldlm_resource_desc); CHECK_MEMBER(ldlm_resource_desc, lr_type); CHECK_MEMBER(ldlm_resource_desc, lr_name); - CHECK_MEMBER(ldlm_resource_desc, lr_version[RES_VERSION_SIZE]); } void @@ -443,7 +442,6 @@ check_ldlm_lock_desc(void) CHECK_MEMBER(ldlm_lock_desc, l_req_mode); CHECK_MEMBER(ldlm_lock_desc, l_granted_mode); CHECK_MEMBER(ldlm_lock_desc, l_policy_data); - CHECK_MEMBER(ldlm_lock_desc, l_version[RES_VERSION_SIZE]); } void @@ -463,10 +461,8 @@ check_ldlm_reply(void) BLANK_LINE(); CHECK_STRUCT(ldlm_reply); CHECK_MEMBER(ldlm_reply, lock_flags); - CHECK_MEMBER(ldlm_reply, lock_mode); - CHECK_MEMBER(ldlm_reply, lock_resource_name); + CHECK_MEMBER(ldlm_request, lock_desc); CHECK_MEMBER(ldlm_reply, lock_handle); - CHECK_MEMBER(ldlm_reply, lock_policy_data); CHECK_MEMBER(ldlm_reply, lock_policy_res1); CHECK_MEMBER(ldlm_reply, lock_policy_res2); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 9f8bcd0f..cdc17b8 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -23,8 +23,8 @@ int main() void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' - * running on Linux schnapps.adilger.int 2.4.22-l32 #4 Thu Jan 8 14:32:57 MST 2004 i686 i686 - * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */ + * running on Linux adevi 2.4.18-p4smp-10pre1 #1 SMP Thu Feb 5 14:52:15 PST 2004 i686 unknown + * with gcc version 2.96 20000731 (Red Hat Linux 7.3 2.96-113) */ /* Constants... */ @@ -93,7 +93,7 @@ void lustre_assert_wire_constants(void) LASSERT(LDLM_CANCEL == 103); LASSERT(LDLM_BL_CALLBACK == 104); LASSERT(LDLM_CP_CALLBACK == 105); - LASSERT(LDLM_LAST_OPC == 106); + LASSERT(LDLM_LAST_OPC == 107); LASSERT(LCK_EX == 1); LASSERT(LCK_PW == 2); LASSERT(LCK_PR == 3); @@ -546,53 +546,45 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct ldlm_intent *)0)->opc) == 8); /* Checks for struct ldlm_resource_desc */ - LASSERT((int)sizeof(struct ldlm_resource_desc) == 52); + LASSERT((int)sizeof(struct ldlm_resource_desc) == 40); LASSERT(offsetof(struct ldlm_resource_desc, lr_type) == 0); LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_type) == 4); - LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 4); + LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 8); LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_name) == 32); - LASSERT(offsetof(struct ldlm_resource_desc, lr_version[4]) == 52); - LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_version[4]) == 4); /* Checks for struct ldlm_lock_desc */ - LASSERT((int)sizeof(struct ldlm_lock_desc) == 108); + LASSERT((int)sizeof(struct ldlm_lock_desc) == 80); LASSERT(offsetof(struct ldlm_lock_desc, l_resource) == 0); - LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_resource) == 52); - LASSERT(offsetof(struct ldlm_lock_desc, l_req_mode) == 52); + LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_resource) == 40); + LASSERT(offsetof(struct ldlm_lock_desc, l_req_mode) == 40); LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_req_mode) == 4); - LASSERT(offsetof(struct ldlm_lock_desc, l_granted_mode) == 56); + LASSERT(offsetof(struct ldlm_lock_desc, l_granted_mode) == 44); LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_granted_mode) == 4); - LASSERT(offsetof(struct ldlm_lock_desc, l_policy_data) == 60); + LASSERT(offsetof(struct ldlm_lock_desc, l_policy_data) == 48); LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_policy_data) == 32); - LASSERT(offsetof(struct ldlm_lock_desc, l_version[4]) == 108); - LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_version[4]) == 4); /* Checks for struct ldlm_request */ - LASSERT((int)sizeof(struct ldlm_request) == 128); + LASSERT((int)sizeof(struct ldlm_request) == 104); LASSERT(offsetof(struct ldlm_request, lock_flags) == 0); LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4); - LASSERT(offsetof(struct ldlm_request, lock_desc) == 4); - LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 108); - LASSERT(offsetof(struct ldlm_request, lock_handle1) == 112); + LASSERT(offsetof(struct ldlm_request, lock_desc) == 8); + LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80); + LASSERT(offsetof(struct ldlm_request, lock_handle1) == 88); LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8); - LASSERT(offsetof(struct ldlm_request, lock_handle2) == 120); + LASSERT(offsetof(struct ldlm_request, lock_handle2) == 96); LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8); /* Checks for struct ldlm_reply */ - LASSERT((int)sizeof(struct ldlm_reply) == 96); + LASSERT((int)sizeof(struct ldlm_reply) == 108); LASSERT(offsetof(struct ldlm_reply, lock_flags) == 0); LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4); - LASSERT(offsetof(struct ldlm_reply, lock_mode) == 4); - LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_mode) == 4); - LASSERT(offsetof(struct ldlm_reply, lock_resource_name) == 8); - LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_resource_name) == 32); - LASSERT(offsetof(struct ldlm_reply, lock_handle) == 40); + LASSERT(offsetof(struct ldlm_request, lock_desc) == 8); + LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80); + LASSERT(offsetof(struct ldlm_reply, lock_handle) == 84); LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8); - LASSERT(offsetof(struct ldlm_reply, lock_policy_data) == 48); - LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_data) == 32); - LASSERT(offsetof(struct ldlm_reply, lock_policy_res1) == 80); + LASSERT(offsetof(struct ldlm_reply, lock_policy_res1) == 92); LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res1) == 8); - LASSERT(offsetof(struct ldlm_reply, lock_policy_res2) == 88); + LASSERT(offsetof(struct ldlm_reply, lock_policy_res2) == 100); LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2) == 8); /* Checks for struct ptlbd_op */ -- 1.8.3.1