From: adilger Date: Wed, 3 Mar 2004 09:56:05 +0000 (+0000) Subject: Land b_smallfix onto b1_2 (20040302_2126) X-Git-Tag: v1_8_0_110~486^6~111 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=5d93be0df66ca35b908a47cf4a6ac449abbaaf28;p=fs%2Flustre-release.git Land b_smallfix onto b1_2 (20040302_2126) b=2809, b=2706, b=2816, b=1987, b=2884, b=1191, b=2765, b=2805 --- diff --git a/lnet/lnet/api-init.c b/lnet/lnet/api-init.c index b811391..e2921ac 100644 --- a/lnet/lnet/api-init.c +++ b/lnet/lnet/api-init.c @@ -29,7 +29,7 @@ int ptl_init; unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL | S_GMNAL | S_IBNAL); unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA | - D_RPCTRACE | D_VFSTRACE); + D_RPCTRACE | D_VFSTRACE | D_MALLOC); unsigned int portal_cerror = 1; unsigned int portal_printk; unsigned int portal_stack; diff --git a/lustre/ChangeLog b/lustre/ChangeLog index c4950b4..b8ca8c9 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -48,6 +48,14 @@ tbd Cluster File Systems, Inc. - don't write pages to disk if there was an error (1450) - don't ping imports that have recovery disabled (2676) - take buffered bytes into account when balancing socknal conn (2817) + - hold a DLM lock over readdir always, use truncate_inode_pages (2706) + - reconnect unlink llog connection after MDS reconnects to OST (2816) + - remove little-endian swabbing of llog records (1987) + - set/limit i_blksize to LL_MAX_BLKSIZE on client (2884) + - retry reposting request buffers if they fail (1191) + - grow extent at grant time to avoid granting a revoked lock (2809) + - lock revoke doesn't evict page if covered by a second lock (2765) + - disable VM readahead to avoid reading outside lock extents (2805) * miscellania - return LL_SUPER_MAGIC from statfs for the filesystem type (1972) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index e37dcb1..b8515a3 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -8,7 +8,7 @@ #ifdef __KERNEL__ # include -#endif +#endif #include #include @@ -91,15 +91,13 @@ typedef enum { #define LDLM_CB_BLOCKING 1 #define LDLM_CB_CANCELING 2 -#define L2B(c) (1 << c) - /* compatibility matrix */ -#define LCK_COMPAT_EX L2B(LCK_NL) -#define LCK_COMPAT_PW (LCK_COMPAT_EX | L2B(LCK_CR)) -#define LCK_COMPAT_PR (LCK_COMPAT_PW | L2B(LCK_PR)) -#define LCK_COMPAT_CW (LCK_COMPAT_PW | L2B(LCK_CW)) -#define LCK_COMPAT_CR (LCK_COMPAT_CW | L2B(LCK_PR) | L2B(LCK_PW)) -#define LCK_COMPAT_NL (LCK_COMPAT_CR | L2B(LCK_EX)) +#define LCK_COMPAT_EX LCK_NL +#define LCK_COMPAT_PW (LCK_COMPAT_EX | LCK_CR) +#define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR) +#define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW) +#define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW) +#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX) static ldlm_mode_t lck_compat_array[] = { [LCK_EX] LCK_COMPAT_EX, @@ -110,12 +108,14 @@ static ldlm_mode_t lck_compat_array[] = { [LCK_NL] LCK_COMPAT_NL }; -static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) +static inline void lockmode_verify(ldlm_mode_t mode) { - LASSERT(exist >= LCK_EX && exist <= LCK_NL); - LASSERT(new >= LCK_EX && new <= LCK_NL); + LASSERT(mode >= LCK_EX && mode <= LCK_NL); +} - return (lck_compat_array[exist] & L2B(new)); +static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) +{ + return (lck_compat_array[exist] & new); } /* @@ -133,8 +133,8 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) - */ -struct ldlm_lock; -struct ldlm_resource; +struct ldlm_lock; +struct ldlm_resource; struct ldlm_namespace; typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, @@ -155,7 +155,7 @@ struct ldlm_namespace { struct list_head ns_root_list; /* all root resources in ns */ struct lustre_lock ns_lock; /* protects hash, refcount, list */ struct list_head ns_list_chain; /* position in global NS list */ - /* + /* struct proc_dir_entry *ns_proc_dir; */ @@ -200,8 +200,6 @@ struct ldlm_lock { struct list_head l_lru; struct list_head l_res_link; // position in one of three res lists struct list_head l_export_chain; // per-export chain of locks - struct list_head l_pending_chain; // locks with callbacks pending - unsigned long l_callback_timeout; ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; @@ -209,22 +207,13 @@ struct ldlm_lock { ldlm_completion_callback l_completion_ast; ldlm_blocking_callback l_blocking_ast; ldlm_glimpse_callback l_glimpse_ast; - void *l_ast_data; struct obd_export *l_export; - /* XXX phil can fix this, I'm sure */ struct obd_export *l_conn_export; -// struct lustre_handle *l_connh; __u32 l_flags; struct lustre_handle l_remote_handle; ldlm_policy_data_t l_policy_data; - /* This LVB is used only on the client side, as temporary storage for - * a lock value block received during an enqueue */ - __u32 l_lvb_len; - void *l_lvb_data; - void *l_lvb_swabber; - __u32 l_readers; __u32 l_writers; __u8 l_destroyed; @@ -234,9 +223,20 @@ struct ldlm_lock { * on this waitq to learn when it becomes granted. */ wait_queue_head_t l_waitq; struct timeval l_enqueued_time; - unsigned long l_last_used; /* jiffies */ -}; + unsigned long l_last_used; /* jiffies */ + struct ldlm_extent l_req_extent; + + /* Client-side-only members */ + __u32 l_lvb_len; /* temporary storage for */ + void *l_lvb_data; /* an LVB received during */ + void *l_lvb_swabber; /* an enqueue */ + void *l_ast_data; + + /* Server-side-only members */ + struct list_head l_pending_chain; /* callbacks pending */ + unsigned long l_callback_timeout; +}; #define LDLM_PLAIN 10 #define LDLM_EXTENT 11 @@ -308,7 +308,8 @@ do { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\ - "] flags: %x remote: "LPX64" expref: %d\n" , ## a, \ + "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 \ + " expref: %d\n" , ## a, \ lock->l_resource->lr_namespace->ns_name, lock, \ lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \ lock->l_readers, lock->l_writers, \ @@ -320,6 +321,7 @@ do { \ ldlm_typename[lock->l_resource->lr_type], \ lock->l_policy_data.l_extent.start, \ lock->l_policy_data.l_extent.end, \ + lock->l_req_extent.start, lock->l_req_extent.end, \ lock->l_flags, lock->l_remote_handle.cookie, \ lock->l_export ? \ atomic_read(&lock->l_export->exp_refcount) : -99); \ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index efb441a..cd966c4 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -716,11 +716,11 @@ extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id); /* lock types */ typedef enum { LCK_EX = 1, - LCK_PW, - LCK_PR, - LCK_CW, - LCK_CR, - LCK_NL + LCK_PW = 2, + LCK_PR = 4, + LCK_CW = 8, + LCK_CR = 16, + LCK_NL = 32 } ldlm_mode_t; struct ldlm_extent { diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index 200d83f..1d0ff9f 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -134,7 +134,8 @@ int llog_catlog_list(struct obd_device *obd, int count, int llog_initiator_connect(struct llog_ctxt *ctxt); int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp); int llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen); + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid); int llog_handle_connect(struct ptlrpc_request *req); /* recov_thread.c */ @@ -143,7 +144,8 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct llog_cookie *cookies, int flags); int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp); int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen); + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, @@ -169,7 +171,8 @@ struct llog_operations { int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int (*lop_connect)(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen); + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid); /* XXX add 2 more: commit callbacks and llog recovery functions */ }; @@ -268,10 +271,10 @@ static inline int llog_write_rec(struct llog_handle *handle, RETURN(-EOPNOTSUPP); if (buf) - buflen = le32_to_cpu(rec->lrh_len) + sizeof(struct llog_rec_hdr) + buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) + sizeof(struct llog_rec_tail); else - buflen = le32_to_cpu(rec->lrh_len); + buflen = rec->lrh_len; LASSERT(size_round(buflen) == buflen); rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx); @@ -367,7 +370,8 @@ static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res, } static inline int llog_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen) + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid) { struct llog_operations *lop; int rc; @@ -379,7 +383,7 @@ static inline int llog_connect(struct llog_ctxt *ctxt, int count, if (lop->lop_connect == NULL) RETURN(-EOPNOTSUPP); - rc = lop->lop_connect(ctxt, count, logid, gen); + rc = lop->lop_connect(ctxt, count, logid, gen, uuid); RETURN(rc); } diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index ca7d26e..13ce57e 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -405,7 +405,6 @@ struct ptlrpc_request_buffer_desc { struct ptlrpc_srv_ni *rqbd_srv_ni; ptl_handle_md_t rqbd_md_h; int rqbd_refcount; - int rqbd_eventcount; char *rqbd_buffer; struct ptlrpc_cb_id rqbd_cbid; struct ptlrpc_request rqbd_req; @@ -426,7 +425,7 @@ struct ptlrpc_srv_ni { /* Interface-specific service state */ struct ptlrpc_service *sni_service; /* owning service */ struct ptlrpc_ni *sni_ni; /* network interface */ - struct list_head sni_rqbds; /* all the request buffers */ + struct list_head sni_active_rqbds; /* req buffers receiving */ struct list_head sni_active_replies; /* all the active replies */ int sni_nrqbd_receiving; /* # posted request buffers */ }; @@ -441,6 +440,7 @@ struct ptlrpc_service { int srv_nthreads; /* # running threads */ int srv_n_difficult_replies; /* # 'difficult' replies */ int srv_n_active_reqs; /* # reqs being served */ + int srv_rqbd_timeout; /* timeout before re-posting reqs */ __u32 srv_req_portal; __u32 srv_rep_portal; @@ -448,6 +448,8 @@ struct ptlrpc_service { int srv_n_queued_reqs; /* # reqs waiting to be served */ struct list_head srv_request_queue; /* reqs waiting for service */ + struct list_head srv_idle_rqbds; /* request buffers to be reposted */ + atomic_t srv_outstanding_replies; struct list_head srv_reply_queue; /* replies waiting for service */ @@ -510,7 +512,7 @@ int ptlrpc_reply(struct ptlrpc_request *req); int ptlrpc_error(struct ptlrpc_request *req); void ptlrpc_resend_req(struct ptlrpc_request *request); int ptl_send_rpc(struct ptlrpc_request *request); -void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd); +int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd); /* ptlrpc/client.c */ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 4711346..ca545a9 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -77,7 +77,8 @@ struct lov_stripe_md { /* Public members. */ __u64 lsm_object_id; /* lov object id */ __u64 lsm_object_gr; /* lov object id */ - __u64 lsm_maxbytes; + __u64 lsm_maxbytes; /* maximum possible file size */ + unsigned long lsm_xfersize; /* optimal transfer size */ /* LOV-private members start here -- only for use in lov/. */ __u32 lsm_magic; diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 7b232ea..41fb301 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -122,6 +122,7 @@ extern unsigned int obd_sync_filter; #define OBD_FAIL_PTLRPC 0x500 #define OBD_FAIL_PTLRPC_ACK 0x501 +#define OBD_FAIL_PTLRPC_RQBD 0x502 #define OBD_FAIL_OBD_PING_NET 0x600 #define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 32fb89d..c2c1e25 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -36,67 +36,101 @@ * - the maximum extent * - containing the requested extent * - and not overlapping existing conflicting extents outside the requested one - * - * An alternative policy is to not shrink the new extent when conflicts exist */ + */ static void ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req, struct ldlm_extent *new_ex) { struct list_head *tmp; ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_policy_data.l_extent.start; - __u64 req_end = req->l_policy_data.l_extent.end; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; ENTRY; - if (new_ex->start == req_start && new_ex->end == req_end) { - EXIT; - return; - } + lockmode_verify(req_mode); list_for_each(tmp, queue) { struct ldlm_lock *lock; + struct ldlm_extent *l_extent; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + l_extent = &lock->l_policy_data.l_extent; - if (req == lock) { + if (new_ex->start == req_start && new_ex->end == req_end) { EXIT; return; } - /* if lock doesn't overlap new_ex, skip it. */ - if (lock->l_policy_data.l_extent.end < new_ex->start || - lock->l_policy_data.l_extent.start > new_ex->end) + /* Don't conflict with ourselves */ + if (req == lock) + continue; + + /* If lock doesn't overlap new_ex, skip it. */ + if (l_extent->end < new_ex->start || + l_extent->start > new_ex->end) continue; /* Locks are compatible, overlap doesn't matter */ if (lockmode_compat(lock->l_req_mode, req_mode)) continue; - if (lock->l_policy_data.l_extent.start < req_start) { - if (lock->l_policy_data.l_extent.end == ~0) { + /* Locks conflicting in requested extents and we can't satisfy + * both locks, so ignore it. Either we will ping-pong this + * extent (we would regardless of what extent we granted) or + * lock is unused and it shouldn't limit our extent growth. */ + if (lock->l_req_extent.end >= req_start && + lock->l_req_extent.start <= req_end) + continue; + + /* We grow extents downwards only as far as they don't overlap + * with already-granted locks, on the assumtion that clients + * will be writing beyond the initial requested end and would + * then need to enqueue a new lock beyond the previous request. + * We don't grow downwards if there are lots of lockers. */ + if (l_extent->start < req_start) { + if (atomic_read(&req->l_resource->lr_refcount) > 20) new_ex->start = req_start; - new_ex->end = req_end; - EXIT; - return; - } - new_ex->start = min(lock->l_policy_data.l_extent.end+1, - req_start); + else + new_ex->start = min(l_extent->end+1, req_start); } - if (lock->l_policy_data.l_extent.end > req_end) { - if (lock->l_policy_data.l_extent.start == 0) { - new_ex->start = req_start; - new_ex->end = req_end; - EXIT; - return; - } - new_ex->end = MAX(lock->l_policy_data.l_extent.start-1, - req_end); + /* If we need to cancel this lock anyways because our request + * overlaps the granted lock, we grow up to its requested + * extent start instead of limiting this extent, assuming that + * clients are writing forwards and the lock had over grown + * its extent downwards before we enqueued our request. */ + if (l_extent->end > req_end) { + if (l_extent->start <= req_end) + new_ex->end = max(lock->l_req_extent.start - 1, + req_end); + else + new_ex->end = max(l_extent->start - 1, req_end); } } EXIT; } -/* Determine if the lock is compatible with all locks on the queue. */ +/* In order to determine the largest possible extent we can grant, we need + * to scan all of the queues. */ +static void ldlm_extent_policy(struct ldlm_resource *res, + struct ldlm_lock *lock, int *flags) +{ + struct ldlm_extent new_ex = { .start = 0, .end = ~0}; + + ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex); + ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex); + + if (new_ex.start != lock->l_policy_data.l_extent.start || + new_ex.end != lock->l_policy_data.l_extent.end) { + *flags |= LDLM_FL_LOCK_CHANGED; + lock->l_policy_data.l_extent.start = new_ex.start; + lock->l_policy_data.l_extent.end = new_ex.end; + } +} + +/* Determine if the lock is compatible with all locks on the queue. + * We stop walking the queue if we hit ourselves so we don't take + * conflicting locks enqueued after us into accound, or we'd wait forever. */ static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, int send_cbs) @@ -104,11 +138,13 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, struct list_head *tmp; struct ldlm_lock *lock; ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_policy_data.l_extent.start; - __u64 req_end = req->l_policy_data.l_extent.end; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; int compat = 1; ENTRY; + lockmode_verify(req_mode); + list_for_each(tmp, queue) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -148,7 +184,6 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err) { struct ldlm_resource *res = lock->l_resource; - struct ldlm_extent new_ex = {0, ~0}; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); int rc; ENTRY; @@ -165,22 +200,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); + + ldlm_extent_policy(res, lock, flags); ldlm_grant_lock(lock, NULL, 0, 1); RETURN(LDLM_ITER_CONTINUE); } - /* In order to determine the largest possible extent we can - * grant, we need to scan all of the queues. */ - ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex); - ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex); - - if (new_ex.start != lock->l_policy_data.l_extent.start || - new_ex.end != lock->l_policy_data.l_extent.end) { - *flags |= LDLM_FL_LOCK_CHANGED; - lock->l_policy_data.l_extent.start = new_ex.start; - lock->l_policy_data.l_extent.end = new_ex.end; - } - restart: LASSERT(res->lr_tmp == NULL); res->lr_tmp = &rpc_list; @@ -204,6 +229,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { + ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); ldlm_grant_lock(lock, NULL, 0, 0); } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 181c72e..148be59 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -150,6 +150,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } } } else { + lockmode_verify(mode); + /* This loop determines if there are existing locks * that conflict with the new lock request. */ list_for_each(tmp, &res->lr_granted) { @@ -164,7 +166,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* locks are compatible, overlap doesn't matter */ if (lockmode_compat(lock->l_granted_mode, mode)) continue; - + if (!ldlm_flocks_overlap(lock, req)) continue; diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 4186f5c..4111cbe 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -19,6 +19,10 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue); int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list); +/* ldlm_lockd.c */ +int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock); + /* ldlm_plain.c */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index c5643db..bcaed00 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -255,6 +255,7 @@ int client_connect_import(struct lustre_handle *dlm_handle, GOTO(out_ldlm, rc); } + ptlrpc_pinger_add_import(imp); EXIT; if (rc) { diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index bb0c0c1..1234786 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -430,9 +430,9 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) { l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_lock_remove_from_lru(lock); - if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) + if (mode & (LCK_NL | LCK_CR | LCK_PR)) lock->l_readers++; - else + if (mode & (LCK_EX | LCK_CW | LCK_PW)) lock->l_writers++; lock->l_last_used = jiffies; l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -448,10 +448,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); ns = lock->l_resource->lr_namespace; l_lock(&ns->ns_lock); - if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) { + if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); lock->l_readers--; - } else { + } + if (mode & (LCK_EX | LCK_CW | LCK_PW)) { LASSERT(lock->l_writers > 0); lock->l_writers--; } @@ -473,13 +474,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) "warning\n"); LDLM_DEBUG(lock, "final decref done on cbpending lock"); - l_unlock(&ns->ns_lock); - l_check_no_ns_lock(ns); - /* FIXME: need a real 'desc' here */ - if (lock->l_blocking_ast != NULL) - lock->l_blocking_ast(lock, NULL, lock->l_ast_data, - LDLM_CB_BLOCKING); + LDLM_LOCK_GET(lock); /* dropped by bl thread */ + ldlm_lock_remove_from_lru(lock); + ldlm_bl_to_thread(ns, NULL, lock); + l_unlock(&ns->ns_lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers) { /* If this is a client-side namespace and this was the last @@ -577,7 +576,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_readers == 0 && lock->l_writers == 0) continue; - if (lock->l_req_mode != mode) + if (!(lock->l_req_mode & mode)) continue; if (lock->l_resource->lr_type == LDLM_EXTENT && @@ -593,7 +592,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, !(lock->l_flags & LDLM_FL_LOCAL)) continue; - ldlm_lock_addref_internal(lock, mode); + if (flags & LDLM_FL_TEST_LOCK) + LDLM_LOCK_GET(lock); + else + ldlm_lock_addref_internal(lock, mode); return lock; } @@ -622,6 +624,8 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock) * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked * to be canceled can still be matched as long as they still have reader * or writer refernces + * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock, + * just tell us if we would have matched. * * Returns 1 if it finds an already-existing lock that is compatible; in this * case, lockh is filled in with a addref()ed lock @@ -691,6 +695,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, if (old_lock) LDLM_LOCK_PUT(old_lock); + if (flags & LDLM_FL_TEST_LOCK && rc) + LDLM_LOCK_PUT(lock); return rc; } @@ -1041,7 +1047,7 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp) struct ldlm_resource *res; l_lock(&ns->ns_lock); - while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { + while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { lock = list_entry(exp->exp_ldlm_data.led_held_locks.next, struct ldlm_lock, l_export_chain); res = ldlm_resource_getref(lock->l_resource); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index ef2424e..5765d8c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -51,7 +51,7 @@ static int ldlm_refcount = 0; /* LDLM state */ -static struct ldlm_state *ldlm ; +static struct ldlm_state *ldlm_state; inline unsigned long round_timeout(unsigned long timeout) { @@ -625,6 +625,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data, sizeof(ldlm_policy_data_t)); + if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) + memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent, + sizeof(lock->l_req_extent)); err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags); if (err) @@ -924,10 +927,10 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } #ifdef __KERNEL__ -static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns, - struct ldlm_lock_desc *ld, struct ldlm_lock *lock) +int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock) { - struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool; + struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; struct ldlm_bl_work_item *blwi; ENTRY; @@ -936,7 +939,8 @@ static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns, RETURN(-ENOMEM); blwi->blwi_ns = ns; - blwi->blwi_ld = *ld; + if (ld != NULL) + blwi->blwi_ld = *ld; blwi->blwi_lock = lock; spin_lock(&blp->blp_lock); @@ -1063,7 +1067,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) case LDLM_BL_CALLBACK: CDEBUG(D_INODE, "blocking ast\n"); #ifdef __KERNEL__ - rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock); + rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock); ldlm_callback_reply(req, rc); #else rc = 0; @@ -1239,11 +1243,11 @@ static int ldlm_setup(void) #endif ENTRY; - if (ldlm != NULL) + if (ldlm_state != NULL) RETURN(-EALREADY); - OBD_ALLOC(ldlm, sizeof(*ldlm)); - if (ldlm == NULL) + OBD_ALLOC(ldlm_state, sizeof(*ldlm_state)); + if (ldlm_state == NULL) RETURN(-ENOMEM); #ifdef __KERNEL__ @@ -1252,25 +1256,25 @@ static int ldlm_setup(void) GOTO(out_free, rc); #endif - ldlm->ldlm_cb_service = + ldlm_state->ldlm_cb_service = ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, ldlm_callback_handler, "ldlm_cbd", ldlm_svc_proc_dir); - if (!ldlm->ldlm_cb_service) { + if (!ldlm_state->ldlm_cb_service) { CERROR("failed to start service\n"); GOTO(out_proc, rc = -ENOMEM); } - ldlm->ldlm_cancel_service = - ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, + ldlm_state->ldlm_cancel_service = + ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL, LDLM_CANCEL_REPLY_PORTAL, ldlm_cancel_handler, "ldlm_canceld", ldlm_svc_proc_dir); - if (!ldlm->ldlm_cancel_service) { + if (!ldlm_state->ldlm_cancel_service) { CERROR("failed to start service\n"); GOTO(out_proc, rc = -ENOMEM); } @@ -1278,7 +1282,7 @@ static int ldlm_setup(void) OBD_ALLOC(blp, sizeof(*blp)); if (blp == NULL) GOTO(out_proc, rc = -ENOMEM); - ldlm->ldlm_bl_pool = blp; + ldlm_state->ldlm_bl_pool = blp; atomic_set(&blp->blp_num_threads, 0); init_waitqueue_head(&blp->blp_waitq); @@ -1302,14 +1306,14 @@ static int ldlm_setup(void) wait_for_completion(&blp->blp_comp); } - rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service, + rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service, LDLM_NUM_THREADS, "ldlm_cn"); if (rc) { LBUG(); GOTO(out_thread, rc); } - rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service, + rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service, LDLM_NUM_THREADS, "ldlm_cb"); if (rc) { LBUG(); @@ -1341,8 +1345,8 @@ static int ldlm_setup(void) #ifdef __KERNEL__ out_thread: - ptlrpc_unregister_service(ldlm->ldlm_cancel_service); - ptlrpc_unregister_service(ldlm->ldlm_cb_service); + ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); + ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); #endif out_proc: @@ -1350,15 +1354,15 @@ static int ldlm_setup(void) ldlm_proc_cleanup(); out_free: #endif - OBD_FREE(ldlm, sizeof(*ldlm)); - ldlm = NULL; + OBD_FREE(ldlm_state, sizeof(*ldlm_state)); + ldlm_state = NULL; return rc; } static int ldlm_cleanup(int force) { #ifdef __KERNEL__ - struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool; + struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; #endif ENTRY; @@ -1383,10 +1387,10 @@ static int ldlm_cleanup(int force) } OBD_FREE(blp, sizeof(*blp)); - ptlrpc_stop_all_threads(ldlm->ldlm_cb_service); - ptlrpc_unregister_service(ldlm->ldlm_cb_service); - ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service); - ptlrpc_unregister_service(ldlm->ldlm_cancel_service); + ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service); + ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); + ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service); + ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); ldlm_proc_cleanup(); expired_lock_thread.elt_state = ELT_TERMINATE; @@ -1396,8 +1400,8 @@ static int ldlm_cleanup(int force) #endif - OBD_FREE(ldlm, sizeof(*ldlm)); - ldlm = NULL; + OBD_FREE(ldlm_state, sizeof(*ldlm_state)); + ldlm_state = NULL; RETURN(0); } @@ -1498,6 +1502,8 @@ EXPORT_SYMBOL(ldlm_regression_stop); EXPORT_SYMBOL(ldlm_namespace_new); EXPORT_SYMBOL(ldlm_namespace_cleanup); EXPORT_SYMBOL(ldlm_namespace_free); +EXPORT_SYMBOL(ldlm_namespace_dump); +EXPORT_SYMBOL(ldlm_dump_all_namespaces); EXPORT_SYMBOL(ldlm_resource_get); EXPORT_SYMBOL(ldlm_resource_putref); diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 9b2af34..9a693e3 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -43,6 +43,8 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, int compat = 1; ENTRY; + lockmode_verify(req_mode); + list_for_each(tmp, queue) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 30e650b..01e4562 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -168,6 +168,9 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) memcpy(&lock->l_policy_data, policy, sizeof(*policy)); + if (type == LDLM_EXTENT) + memcpy(&lock->l_req_extent, &policy->l_extent, + sizeof(policy->l_extent)); err = ldlm_lock_enqueue(ns, &lock, policy, flags); if (err != ELDLM_OK) @@ -255,6 +258,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) memcpy(&lock->l_policy_data, policy, sizeof(*policy)); + if (type == LDLM_EXTENT) + memcpy(&lock->l_req_extent, &policy->l_extent, + sizeof(policy->l_extent)); LDLM_DEBUG(lock, "client-side enqueue START"); } @@ -583,9 +589,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) int ldlm_cancel_lru(struct ldlm_namespace *ns) { - struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); + struct list_head *tmp, *next; int count, rc = 0; - struct ldlm_ast_work *w; ENTRY; l_lock(&ns->ns_lock); @@ -609,33 +614,14 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns) * won't see this flag and call l_blocking_ast */ lock->l_flags |= LDLM_FL_CBPENDING; - OBD_ALLOC(w, sizeof(*w)); - LASSERT(w); - - w->w_lock = LDLM_LOCK_GET(lock); - list_add(&w->w_list, &list); + LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); + ldlm_bl_to_thread(ns, NULL, lock); if (--count == 0) break; } l_unlock(&ns->ns_lock); - - list_for_each_safe(tmp, next, &list) { - struct lustre_handle lockh; - int rc; - w = list_entry(tmp, struct ldlm_ast_work, w_list); - - ldlm_lock2handle(w->w_lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc); - - list_del(&w->w_list); - LDLM_LOCK_PUT(w->w_lock); - OBD_FREE(w, sizeof(*w)); - } - RETURN(rc); } diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 7733155..3f945a7 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -59,62 +59,20 @@ typedef struct ext2_dir_entry_2 ext2_dirent; static int ll_dir_readpage(struct file *file, struct page *page) { struct inode *inode = page->mapping->host; - struct ll_sb_info *sbi = ll_i2sbi(inode); struct ll_fid mdc_fid; __u64 offset; - int rc = 0; struct ptlrpc_request *request; - struct lustre_handle lockh; struct mds_body *body; - struct lookup_intent it = { .it_op = IT_READDIR }; - struct mdc_op_data data; - struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp); - struct ldlm_res_id res_id = - { .name = {inode->i_ino, (__u64)inode->i_generation} }; + int rc = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - if ((inode->i_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index){ - /* XXX why do we need this exactly, and why do we think that - * an all-zero directory page is useful? - */ - CERROR("memsetting dir page %lu to zero (size %lld)\n", - page->index, inode->i_size); - memset(kmap(page), 0, PAGE_CACHE_SIZE); - kunmap(page); - GOTO(readpage_out, rc); - } - - rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); - if (!rc) { - ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); - - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR, - &data, &lockh, NULL, 0, - ldlm_completion_ast, ll_mdc_blocking_ast, - inode); - request = (struct ptlrpc_request *)it.d.lustre.it_data; - if (request) - ptlrpc_req_finished(request); - if (rc < 0) { - CERROR("lock enqueue: err: %d\n", rc); - unlock_page(page); - RETURN(rc); - } - } - ldlm_lock_dump_handle(D_OTHER, &lockh); - - if (PageUptodate(page)) { - CERROR("Explain this please?\n"); - GOTO(readpage_out, rc); - } mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR); offset = page->index << PAGE_SHIFT; - rc = mdc_readpage(sbi->ll_mdc_exp, &mdc_fid, + rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid, offset, page, &request); if (!rc) { body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body)); @@ -122,16 +80,12 @@ static int ll_dir_readpage(struct file *file, struct page *page) LASSERT_REPSWABBED (request, 0); /* swabbed by mdc_readpage() */ inode->i_size = body->size; + SetPageUptodate(page); } ptlrpc_req_finished(request); - EXIT; - - readpage_out: - if (!rc) - SetPageUptodate(page); unlock_page(page); - ldlm_lock_decref(&lockh, LCK_PR); + EXIT; return rc; } @@ -252,9 +206,39 @@ fail: static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) { + struct ldlm_res_id res_id = + { .name = { dir->i_ino, (__u64)dir->i_generation} }; + struct lustre_handle lockh; + struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); struct address_space *mapping = dir->i_mapping; - struct page *page = read_cache_page(mapping, n, - (filler_t*)mapping->a_ops->readpage, NULL); + struct page *page; + int rc; + + rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, + &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); + if (!rc) { + struct lookup_intent it = { .it_op = IT_READDIR }; + struct ptlrpc_request *request; + struct mdc_op_data data; + + ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0); + + rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it, + LCK_PR, &data, &lockh, NULL, 0, + ldlm_completion_ast, ll_mdc_blocking_ast, dir); + + request = (struct ptlrpc_request *)it.d.lustre.it_data; + if (request) + ptlrpc_req_finished(request); + if (rc < 0) { + CERROR("lock enqueue: rc: %d\n", rc); + return ERR_PTR(rc); + } + } + ldlm_lock_dump_handle(D_OTHER, &lockh); + + page = read_cache_page(mapping, n, + (filler_t*)mapping->a_ops->readpage, NULL); if (!IS_ERR(page)) { wait_on_page(page); (void)kmap(page); @@ -265,14 +249,17 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) if (PageError(page)) goto fail; } + +out_unlock: + ldlm_lock_decref(&lockh, LCK_PR); return page; fail: ext2_put_page(page); - return ERR_PTR(-EIO); + page = ERR_PTR(-EIO); + goto out_unlock; } - /* * p is at least 6 bytes before the end of page */ @@ -305,8 +292,8 @@ static unsigned char ext2_filetype_table[EXT2_FT_MAX] = { int ll_readdir(struct file * filp, void * dirent, filldir_t filldir) { - loff_t pos = filp->f_pos; struct inode *inode = filp->f_dentry->d_inode; + loff_t pos = filp->f_pos; // XXX struct super_block *sb = inode->i_sb; unsigned offset = pos & ~PAGE_CACHE_MASK; unsigned long n = pos >> PAGE_CACHE_SHIFT; @@ -314,12 +301,14 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir) unsigned chunk_mask = ~(ext2_chunk_size(inode)-1); unsigned char *types = NULL; int need_revalidate = (filp->f_version != inode->i_version); + int rc = 0; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, - inode->i_generation, inode); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n", + inode->i_ino, inode->i_generation, inode, pos, inode->i_size); + if (pos > inode->i_size - EXT2_DIR_REC_LEN(1)) - GOTO(done, 0); + RETURN(0); types = ext2_filetype_table; @@ -328,15 +317,21 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir) ext2_dirent *de; struct page *page; - CDEBUG(D_EXT2, "reading %lu of dir %lu page %lu, size %llu\n", - PAGE_CACHE_SIZE, inode->i_ino, n, inode->i_size); + CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu/%lu size %llu\n", + PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation, + n, npages, inode->i_size); page = ll_get_dir_page(inode, n); /* size might have been updated by mdc_readpage */ npages = dir_pages(inode); - if (IS_ERR(page)) + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CERROR("error reading dir %lu/%u page %lu: rc %d\n", + inode->i_ino, inode->i_generation, n, rc); continue; + } + kaddr = page_address(page); if (need_revalidate) { offset = ext2_validate_entry(kaddr, offset, chunk_mask); @@ -349,6 +344,7 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir) int over; unsigned char d_type = DT_UNKNOWN; + rc = 0; /* no error if we return something */ if (types && de->file_type < EXT2_FT_MAX) d_type = types[de->file_type]; @@ -358,7 +354,7 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir) le32_to_cpu(de->inode), d_type); if (over) { ext2_put_page(page); - GOTO(done,0); + GOTO(done, rc); } } } @@ -369,7 +365,7 @@ done: filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset; filp->f_version = inode->i_version; update_atime(inode); - RETURN(0); + RETURN(rc); } static int ll_dir_ioctl(struct inode *inode, struct file *file, diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 456156b..710c637 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -318,14 +318,18 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock) * * No one can dirty the extent until we've finished our work and they can * enqueue another lock. The DLM protects us from ll_file_read/write here, - * but other kernel actors could have pages locked. */ + * but other kernel actors could have pages locked. + * + * Called with the DLM lock held. */ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, struct ldlm_lock *lock, __u32 stripe) { struct ldlm_extent *extent = &lock->l_policy_data.l_extent; + ldlm_policy_data_t tmpex; unsigned long start, end, count, skip, i, j; struct page *page; - int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA; + int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA; + struct lustre_handle lockh; ENTRY; CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n", @@ -348,7 +352,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, start += (start/count * skip) + (stripe * count); if (end != ~0) end += (end/count * skip) + (stripe * count); - } + } i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT; if (i < end) @@ -407,8 +411,16 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, lock_page(page); } - /* checking again to account for writeback's lock_page() */ - if (page->mapping != NULL) { + tmpex.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; + tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1; + /* check to see if another DLM lock covers this page */ + rc2 = ldlm_lock_match(lock->l_resource->lr_namespace, + LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING | + LDLM_FL_TEST_LOCK, + &lock->l_resource->lr_name, LDLM_EXTENT, + &tmpex, LCK_PR | LCK_PW, &lockh); + if (rc2 == 0 && page->mapping != NULL) { + // checking again to account for writeback's lock_page() LL_CDEBUG_PAGE(page, "truncating\n"); ll_truncate_complete_page(page); } @@ -439,12 +451,17 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, CERROR("ldlm_cli_cancel failed: %d\n", rc); break; case LDLM_CB_CANCELING: { - struct inode *inode = ll_inode_from_lock(lock); + struct inode *inode; struct ll_inode_info *lli; struct lov_stripe_md *lsm; __u32 stripe; __u64 kms; + /* This lock wasn't granted, don't try to evict pages */ + if (lock->l_req_mode != lock->l_granted_mode) + RETURN(0); + + inode = ll_inode_from_lock(lock); if (inode == NULL) RETURN(0); lli = ll_i2info(inode); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index c0ca902..84c910f 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -110,7 +110,7 @@ struct ll_async_page { struct page *llap_page; struct list_head llap_pending_write; /* only trust these if the page lock is providing exclusion */ - int llap_write_queued:1, + int llap_write_queued:1, llap_defer_uptodate:1; struct list_head llap_proc_item; }; @@ -249,6 +249,8 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret); #define LL_SBI_NOLCK 0x1 #define LL_SBI_READAHEAD 0x2 +#define LL_MAX_BLKSIZE (4UL * 1024 * 1024) + #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) #define ll_s2sbi(sb) ((struct ll_sb_info *)((sb)->s_fs_info)) void __d_rehash(struct dentry * entry, int lock); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index ceaa4e0..c17ad63 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -192,6 +192,8 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) GOTO(out_root, err); } + /* bug 2805 - set VM readahead to zero */ + vm_max_readahead = vm_min_readahead = 0; sb->s_root = d_alloc_root(root); RETURN(err); @@ -1053,6 +1055,9 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, LBUG(); } } + /* bug 2844 - limit i_blksize for broken user-space apps */ + LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize); + inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE); if (lli->lli_smd != lsm) obd_free_memmd(ll_i2obdexp(inode), &lsm); } diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 58c9ed9..162f568 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "llite_internal.h" @@ -562,16 +563,16 @@ static int llite_dump_pgcache_seq_open(struct inode *inode, struct file *file) struct ll_sb_info *sbi = dp->data; int rc; - llap = kmalloc(sizeof(*llap), GFP_KERNEL); + OBD_ALLOC_GFP(llap, sizeof(*llap), GFP_KERNEL); if (llap == NULL) return -ENOMEM; llap->llap_page = NULL; llap->llap_cookie = sbi; llap->llap_magic = 0; - + rc = seq_open(file, &llite_dump_pgcache_seq_sops); if (rc) { - kfree(llap); + OBD_FREE(llap, sizeof(*llap)); return rc; } seq = file->private_data; @@ -584,7 +585,7 @@ static int llite_dump_pgcache_seq_open(struct inode *inode, struct file *file) return 0; } -static int llite_dump_pgcache_seq_release(struct inode *inode, +static int llite_dump_pgcache_seq_release(struct inode *inode, struct file *file) { struct seq_file *seq = file->private_data; @@ -595,7 +596,7 @@ static int llite_dump_pgcache_seq_release(struct inode *inode, if (!list_empty(&llap->llap_proc_item)) list_del_init(&llap->llap_proc_item); spin_unlock(&sbi->ll_pglist_lock); - kfree(llap); + OBD_FREE(llap, sizeof(*llap)); return seq_release(inode, file); } @@ -603,7 +604,7 @@ static int llite_dump_pgcache_seq_release(struct inode *inode, struct file_operations llite_dump_pgcache_fops = { .open = llite_dump_pgcache_seq_open, .read = seq_read, - .release = llite_dump_pgcache_seq_release, + .release = llite_dump_pgcache_seq_release, }; #endif /* LPROCFS */ diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index c9ee1db..f4f9666 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -530,18 +530,13 @@ static int ll_page_matches(struct page *page) page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; page_extent.l_extent.end = page_extent.l_extent.start + PAGE_CACHE_SIZE - 1; - flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; + flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK; matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, ll_i2info(inode)->lli_smd, LDLM_EXTENT, - &page_extent, LCK_PR, &flags, inode, &match_lockh); - if (matches < 0) { + &page_extent, LCK_PR | LCK_PW, &flags, inode, + &match_lockh); + if (matches < 0) LL_CDEBUG_PAGE(page, "lock match failed\n"); - RETURN(matches); - } - if (matches) { - obd_cancel(ll_i2sbi(inode)->ll_osc_exp, - ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); - } RETURN(matches); } @@ -565,7 +560,8 @@ static int ll_issue_page_read(struct obd_export *exp, } #define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2) -#define LL_RA_MAX(inode) (inode->i_blksize * 3) +#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \ + PAGE_CACHE_SHIFT) static void ll_readahead(struct ll_readahead_state *ras, struct obd_export *exp, struct address_space *mapping, @@ -612,10 +608,14 @@ static void ll_readahead(struct ll_readahead_state *ras, if (page == NULL) break; + /* Don't try to readahead beyond the end of the lock extent */ + if (ll_page_matches(page) <= 0) + break; + /* the book-keeping above promises that we've tried * all the indices from start to end, so we don't * stop if anyone returns an error. This may not be good. */ - if (Page_Uptodate(page) || ll_page_matches(page) <= 0) + if (Page_Uptodate(page)) goto next_page; llap = llap_from_page(page); @@ -781,11 +781,20 @@ int ll_readpage(struct file *filp, struct page *page) if (rc == 0) { static unsigned long next_print; - CDEBUG(D_INODE, "didn't match a lock\n"); + CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n", + inode->i_ino, page->index, + (long long)page->index << PAGE_CACHE_SHIFT); if (time_after(jiffies, next_print)) { + CERROR("ino %lu page %lu (%llu) not covered by " + "a lock (mmap?). check debug logs.\n", + inode->i_ino, page->index, + (long long)page->index << PAGE_CACHE_SHIFT); + ldlm_dump_all_namespaces(); + if (next_print == 0) { + CERROR("%s\n", portals_debug_dumpstack()); + portals_debug_dumplog(); + } next_print = jiffies + 30 * HZ; - CERROR("not covered by a lock (mmap?). check debug " - "logs.\n"); } } diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index d7f1784..7809366 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -92,7 +92,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count, struct llog_logid *logid, - struct llog_gen *gen) + struct llog_gen *gen, + struct obd_uuid *uuid) { struct obd_device *obd = ctxt->loc_obd; struct lov_obd *lov = &obd->u.lov; @@ -103,7 +104,11 @@ static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count, for (i = 0; i < lov->desc.ld_tgt_count; i++) { struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd; struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); - rc = llog_connect(cctxt, 1, logid, gen); + + if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid)) + continue; + + rc = llog_connect(cctxt, 1, logid, gen, uuid); if (rc) { CERROR("error osc_llog_connect %d\n", i); break; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 7d657f2..92d862f 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -2201,8 +2201,11 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, break; } if (rc == 1) { - if (lsm->lsm_stripe_count > 1) + if (lsm->lsm_stripe_count > 1) { + if (*flags & LDLM_FL_TEST_LOCK) + lov_llh_destroy(lov_lockh); lov_llh_put(lov_lockh); + } RETURN(1); } @@ -2640,7 +2643,10 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, for (i = 0; i < lov->desc.ld_tgt_count; i++) { int er; - if (!lov->tgts[i].active) + if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid)) + continue; + + if (!val && !lov->tgts[i].active) continue; er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen, diff --git a/lustre/lov/lov_pack.c b/lustre/lov/lov_pack.c index 6a4ac6b..1b40327 100644 --- a/lustre/lov/lov_pack.c +++ b/lustre/lov/lov_pack.c @@ -295,6 +295,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern) (*lsmp)->lsm_magic = LOV_MAGIC; (*lsmp)->lsm_stripe_count = stripe_count; (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count; + (*lsmp)->lsm_xfersize = PTL_MTU * stripe_count; (*lsmp)->lsm_pattern = pattern; (*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0; @@ -319,6 +320,7 @@ int lov_unpackmd_v0(struct lov_obd *lov, struct lov_stripe_md *lsm, lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id); /* lsm->lsm_object_gr = 0; implicit */ lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); + lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count; lsm->lsm_pattern = LOV_PATTERN_RAID0; ost_offset = le32_to_cpu(lmm->lmm_stripe_offset); ost_count = le16_to_cpu(lmm->lmm_ost_count); @@ -356,6 +358,7 @@ int lov_unpackmd_v1(struct lov_obd *lov, struct lov_stripe_md *lsm, lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr); lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern); + lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count; for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++) { /* XXX LOV STACKING call down to osc_unpackmd() */ @@ -496,6 +499,7 @@ int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp, (*lsmp)->lsm_oinfo[0].loi_ost_idx = lum.lmm_stripe_offset; (*lsmp)->lsm_stripe_size = lum.lmm_stripe_size; + (*lsmp)->lsm_xfersize = lum.lmm_stripe_size * stripe_count; RETURN(0); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 9123e91..c692def 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -405,13 +405,23 @@ static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc) { union ptlrpc_async_args *aa = data; struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0]; - - mdc_put_rpc_lock(rpc_lock, NULL); + struct obd_device *obd = aa->pointer_arg[1]; + + if (rpc_lock == NULL) { + CERROR("called with NULL rpc_lock\n"); + } else { + mdc_put_rpc_lock(rpc_lock, NULL); + LASSERTF(req->rq_async_args.pointer_arg[0] == + obd->u.cli.cl_rpc_lock, "%p != %p\n", + req->rq_async_args.pointer_arg[0], + obd->u.cli.cl_rpc_lock); + aa->pointer_arg[0] = NULL; + } wake_up(&req->rq_reply_waitq); RETURN(rc); } -/* We can't use ptlrpc_check_reply, because we don't want to wake up for +/* We can't use ptlrpc_check_reply, because we don't want to wake up for * anything but a reply or an error. */ static int mdc_close_check_reply(struct ptlrpc_request *req) { @@ -443,7 +453,6 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, struct ptlrpc_request *req; struct mdc_open_data *mod; struct l_wait_info lwi; - struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock; ENTRY; req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize, @@ -478,9 +487,10 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, /* We hand a ref to the rpcd here, so we need another one of our own. */ ptlrpc_request_addref(req); - mdc_get_rpc_lock(rpc_lock, NULL); + mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL); req->rq_interpret_reply = mdc_close_interpret; - req->rq_async_args.pointer_arg[0] = rpc_lock; + req->rq_async_args.pointer_arg[0] = obd->u.cli.cl_rpc_lock; + req->rq_async_args.pointer_arg[1] = obd; ptlrpcd_add_req(req); lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep, NULL, NULL); @@ -498,6 +508,11 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, "close succeeded. Please tell CFS.\n"); } } + if (req->rq_async_args.pointer_arg[0] != NULL) { + CERROR("returned without dropping rpc_lock: rc %d\n", rc); + mdc_close_interpret(req, &req->rq_async_args, rc); + portals_debug_dumplog(); + } EXIT; out: diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index a8e7ff9..5a50482 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1485,7 +1485,8 @@ static int mds_postrecov(struct obd_device *obd) LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL); rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT), - obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL); + obd->u.mds.mds_lov_desc.ld_tgt_count, + NULL, NULL, NULL); if (rc != 0) { CERROR("faild at llog_origin_connect: %d\n", rc); } diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 5c5fe7a..b8ce8b5 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -54,7 +54,8 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, struct llog_logid *logid, - struct llog_gen *gen) + struct llog_gen *gen, + struct obd_uuid *uuid) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; @@ -63,7 +64,7 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, ENTRY; lctxt = llog_get_context(lov_obd, ctxt->loc_idx); - rc = llog_connect(lctxt, count, logid, gen); + rc = llog_connect(lctxt, count, logid, gen, uuid); RETURN(rc); } diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 97deb7d..0e9d2f0 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -280,7 +280,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) if (!obd->obd_recovering) { rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT), obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, - NULL); + NULL, NULL); if (rc != 0) CERROR("faild at llog_origin_connect: %d\n", rc); @@ -497,6 +497,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active) { struct obd_uuid *uuid; int rc = 0; + ENTRY; if (!active) RETURN(0); @@ -512,6 +513,21 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active) CWARN("MDS %s: in recovery, not resetting orphans on %s\n", obd->obd_name, uuid->uuid); } else { + LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL); + + rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn", + 0, uuid); + if (rc != 0) + RETURN(rc); + + rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT), + obd->u.mds.mds_lov_desc.ld_tgt_count, + NULL, NULL, uuid); + if (rc != 0) { + CERROR("faild at llog_origin_connect: %d\n", rc); + RETURN(rc); + } + CWARN("MDS %s: %s now active, resetting orphans\n", obd->obd_name, uuid->uuid); rc = mds_lov_clearorphans(&obd->u.mds, uuid); diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index e4146dc..0ad595f 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -66,9 +66,9 @@ void llog_free_handle(struct llog_handle *loghandle) if (!loghandle->lgh_hdr) goto out; - if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) + if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) list_del_init(&loghandle->u.phd.phd_entry); - if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_CAT) + if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) LASSERT(list_empty(&loghandle->u.chd.chd_head)); OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); @@ -97,10 +97,10 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) RETURN(-EINVAL); } - llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) - 1); + llh->llh_count--; - if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) && - (le32_to_cpu(llh->llh_count) == 1) && + if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && + (llh->llh_count == 1) && (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { rc = llog_destroy(loghandle); if (rc) @@ -131,10 +131,10 @@ int llog_init_handle(struct llog_handle *handle, int flags, RETURN(-ENOMEM); handle->lgh_hdr = llh; /* first assign flags to use llog_client_ops */ - llh->llh_flags = cpu_to_le32(flags); + llh->llh_flags = flags; rc = llog_read_header(handle); if (rc == 0) { - flags = le32_to_cpu(llh->llh_flags); + flags = llh->llh_flags; if (uuid) LASSERT(obd_uuid_equals(uuid, &llh->llh_tgtuuid)); GOTO(out, rc); @@ -146,21 +146,20 @@ int llog_init_handle(struct llog_handle *handle, int flags, rc = 0; handle->lgh_last_idx = 0; /* header is record with index 0 */ - llh->llh_count = cpu_to_le32(1); /* for the header record */ - llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC); - llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = - cpu_to_le32(LLOG_CHUNK_SIZE); + llh->llh_count = 1; /* for the header record */ + llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC; + llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE; llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0; - llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME)); + llh->llh_timestamp = LTIME_S(CURRENT_TIME); if (uuid) memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid)); - llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap)); + llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap); ext2_set_bit(0, llh->llh_bitmap); out: if (flags & LLOG_F_IS_CAT) { INIT_LIST_HEAD(&handle->u.chd.chd_head); - llh->llh_size = cpu_to_le32(sizeof(struct llog_logid_rec)); + llh->llh_size = sizeof(struct llog_logid_rec); } else if (flags & LLOG_F_IS_PLAIN) INIT_LIST_HEAD(&handle->u.phd.phd_entry); @@ -235,11 +234,12 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, GOTO(out, rc); rec = buf; - idx = le32_to_cpu(rec->lrh_index); + idx = rec->lrh_index; if (idx < index) CDEBUG(D_HA, "index %u : idx %u\n", index, idx); while (idx < index) { - rec = ((void *)rec + le32_to_cpu(rec->lrh_len)); + rec = (struct llog_rec_hdr *) + ((char *)rec + rec->lrh_len); idx ++; } @@ -266,7 +266,8 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, ++index; if (index > last_index) GOTO(out, rc = 0); - rec = ((void *)rec + le32_to_cpu(rec->lrh_len)); + rec = (struct llog_rec_hdr *) + ((char *)rec + rec->lrh_len); } } diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index cce91db..d4fa370 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -61,7 +61,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) index = (cathandle->lgh_last_idx + 1) % bitmap_size; /* maximum number of available slots in catlog is bitmap_size - 2 */ - if (llh->llh_cat_idx == cpu_to_le32(index)) { + if (llh->llh_cat_idx == index) { CERROR("no free catalog slots for log...\n"); RETURN(ERR_PTR(-ENOSPC)); } else { @@ -73,8 +73,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) LBUG(); /* should never happen */ } cathandle->lgh_last_idx = index; - llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); - llh->llh_tail.lrt_index = cpu_to_le32(index); + llh->llh_count++; + llh->llh_tail.lrt_index = index; } rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL); @@ -91,12 +91,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index, cathandle->lgh_id.lgl_oid); /* build the record for this log in the catalog */ - rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec)); - rec.lid_hdr.lrh_index = cpu_to_le32(index); - rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC); + rec.lid_hdr.lrh_len = sizeof(rec); + rec.lid_hdr.lrh_index = index; + rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC; rec.lid_id = loghandle->lgh_id; - rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec)); - rec.lid_tail.lrt_index = cpu_to_le32(index); + rec.lid_tail.lrt_len = sizeof(rec); + rec.lid_tail.lrt_index = index; /* update the catalog: header and record */ rc = llog_write_rec(cathandle, &rec.lid_hdr, @@ -105,7 +105,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) GOTO(out_destroy, rc); } - loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index); + loghandle->lgh_hdr->llh_cat_idx = index; cathandle->u.chd.chd_current_log = loghandle; LASSERT(list_empty(&loghandle->u.phd.phd_entry)); list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); @@ -163,8 +163,8 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, if (!rc) { loghandle->u.phd.phd_cat_handle = cathandle; loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id; - loghandle->u.phd.phd_cookie.lgc_index = - le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx); + loghandle->u.phd.phd_cookie.lgc_index = + loghandle->lgh_hdr->llh_cat_idx; } out: @@ -257,7 +257,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, int rc; ENTRY; - LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE); + LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); loghandle = llog_cat_current_log(cathandle, 1); if (IS_ERR(loghandle)) RETURN(PTR_ERR(loghandle)); @@ -336,13 +336,13 @@ int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, struct llog_handle *llh; int rc; - if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { + if (rec->lrh_type != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, - le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid); + rec->lrh_index, cat_llh->lgh_id.lgl_oid); rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id); if (rc) { @@ -363,7 +363,7 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) int rc; ENTRY; - LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT)); + LASSERT(llh->llh_flags & LLOG_F_IS_CAT); d.lpd_data = data; d.lpd_cb = cb; @@ -371,7 +371,7 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) CWARN("catlog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = le32_to_cpu(llh->llh_cat_idx); + cd.first_idx = llh->llh_cat_idx; cd.last_idx = 0; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); if (rc != 0) @@ -395,17 +395,17 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) ENTRY; bitmap_size = sizeof(llh->llh_bitmap) * 8; - if (llh->llh_cat_idx == cpu_to_le32(index - 1)) { - idx = le32_to_cpu(llh->llh_cat_idx) + 1; - llh->llh_cat_idx = cpu_to_le32(idx); + if (llh->llh_cat_idx == (index - 1)) { + idx = llh->llh_cat_idx + 1; + llh->llh_cat_idx = idx; if (idx == cathandle->lgh_last_idx) goto out; for (i = (index + 1) % bitmap_size; i != cathandle->lgh_last_idx; i = (i + 1) % bitmap_size) { if (!ext2_test_bit(i, llh->llh_bitmap)) { - idx = le32_to_cpu(llh->llh_cat_idx) + 1; - llh->llh_cat_idx = cpu_to_le32(idx); + idx = llh->llh_cat_idx + 1; + llh->llh_cat_idx = idx; } else if (i == 0) { llh->llh_cat_idx = 0; } else { @@ -414,7 +414,7 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) } out: CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n", - cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx)); + cathandle->lgh_id.lgl_oid, llh->llh_cat_idx); } RETURN(0); diff --git a/lustre/obdclass/llog_ioctl.c b/lustre/obdclass/llog_ioctl.c index baa12ad..14d20f2 100644 --- a/lustre/obdclass/llog_ioctl.c +++ b/lustre/obdclass/llog_ioctl.c @@ -69,7 +69,7 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, char *endp; int cur_index, rc = 0; - cur_index = le32_to_cpu(rec->lrh_index); + cur_index = rec->lrh_index; if (ioc_data && (ioc_data->ioc_inllen1)) { l = 0; @@ -90,15 +90,15 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, if (to > 0 && cur_index > to) RETURN(-LLOG_EEMPTY); } - if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) { + if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; struct llog_handle *log_handle; - if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) { + if (rec->lrh_type != LLOG_LOGID_MAGIC) { l = snprintf(out, remains, "[index]: %05d [type]: " "%02x [len]: %04d failed\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + cur_index, rec->lrh_type, + rec->lrh_len); } if (handle->lgh_ctxt == NULL) RETURN(-EOPNOTSUPP); @@ -106,7 +106,7 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, rc = llog_process(log_handle, llog_check_cb, NULL, NULL); llog_close(log_handle); } else { - switch (le32_to_cpu(rec->lrh_type)) { + switch (rec->lrh_type) { case OST_SZ_REC: case OST_RAID1_REC: case MDS_UNLINK_REC: @@ -115,8 +115,8 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, case LLOG_HDR_MAGIC: { l = snprintf(out, remains, "[index]: %05d [type]: " "%02x [len]: %04d ok\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + cur_index, rec->lrh_type, + rec->lrh_len); out += l; remains -= l; if (remains <= 0) { @@ -128,8 +128,8 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, default: { l = snprintf(out, remains, "[index]: %05d [type]: " "%02x [len]: %04d failed\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + cur_index, rec->lrh_type, + rec->lrh_len); out += l; remains -= l; if (remains <= 0) { @@ -168,15 +168,15 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, ioc_data->ioc_inllen1 = 0; } - cur_index = le32_to_cpu(rec->lrh_index); + cur_index = rec->lrh_index; if (cur_index < from) RETURN(0); if (to > 0 && cur_index > to) RETURN(-LLOG_EEMPTY); - if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) { + if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; - if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) { + if (rec->lrh_type != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } @@ -188,8 +188,8 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, } else { l = snprintf(out, remains, "[index]: %05d [type]: %02x [len]: %04d\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + cur_index, rec->lrh_type, + rec->lrh_len); } out += l; remains -= l; @@ -235,7 +235,7 @@ static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_logid_rec *lir = (struct llog_logid_rec*)rec; int rc; - if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) + if (rec->lrh_type != LLOG_LOGID_MAGIC) return (-EINVAL); rc = llog_remove_log(handle, &lir->lid_id); @@ -283,10 +283,10 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) "last index: %d\n", handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr, handle->lgh_id.lgl_ogen, - le32_to_cpu(handle->lgh_hdr->llh_flags), - le32_to_cpu(handle->lgh_hdr->llh_flags) & + handle->lgh_hdr->llh_flags, + handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT ? "cat" : "plain", - le32_to_cpu(handle->lgh_hdr->llh_count), + handle->lgh_hdr->llh_count, handle->lgh_last_idx); out += l; remains -= l; @@ -316,7 +316,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) struct llog_logid plain; char *endp; - if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) + if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)) GOTO(out_close, err = -EINVAL); err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2); @@ -333,7 +333,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) case OBD_IOC_LLOG_REMOVE: { struct llog_logid plain; - if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) + if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)) GOTO(out_close, err = -EINVAL); if (data->ioc_inlbuf2) { @@ -353,7 +353,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) out_close: if (handle->lgh_hdr && - handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) + handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) llog_cat_put(handle); else llog_close(handle); diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c index d70d773..ad0b562 100644 --- a/lustre/obdclass/llog_lvfs.c +++ b/lustre/obdclass/llog_lvfs.c @@ -59,8 +59,8 @@ static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file, LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0); - tail.lrt_len = rec.lrh_len = cpu_to_le32(len); - tail.lrt_index = rec.lrh_index = cpu_to_le32(index); + tail.lrt_len = rec.lrh_len = len; + tail.lrt_index = rec.lrh_index = index; rec.lrh_type = 0; rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0); @@ -86,7 +86,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file, int rc; struct llog_rec_tail end; loff_t saved_off = file->f_pos; - int buflen = le32_to_cpu(rec->lrh_len); + int buflen = rec->lrh_len; ENTRY; file->f_pos = off; @@ -101,7 +101,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file, } /* the buf case */ - rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end)); + rec->lrh_len = sizeof(*rec) + buflen + sizeof(end); rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0); if (rc) { CERROR("error writing log hdr: rc %d\n", rc); @@ -165,7 +165,7 @@ static int llog_lvfs_read_header(struct llog_handle *handle) if (rc) CERROR("error reading log header\n"); - handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index); + handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index; handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size; RETURN(rc); @@ -179,11 +179,10 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, void *buf, int idx) { struct llog_log_hdr *llh; - int reclen = le32_to_cpu(rec->lrh_len), index, rc; + int reclen = rec->lrh_len, index, rc; struct llog_rec_tail *lrt; struct obd_device *obd; struct file *file; - loff_t offset; size_t left; ENTRY; @@ -217,7 +216,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, if (rc || idx == 0) RETURN(rc); - saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len); + saved_offset = sizeof(*llh) + (idx-1)*rec->lrh_len; rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; @@ -236,7 +235,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, */ left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1)); if (buf) - reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) + + reclen = sizeof(*rec) + rec->lrh_len + sizeof(struct llog_rec_tail); /* NOTE: padding is a record, but no bit is set */ @@ -254,9 +253,10 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, loghandle->lgh_last_idx++; index = loghandle->lgh_last_idx; - rec->lrh_index = cpu_to_le32(index); + rec->lrh_index = index; if (buf == NULL) { - lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt); + lrt = (struct llog_rec_tail *) + ((char *)rec + rec->lrh_len - sizeof(*lrt)); lrt->lrt_len = rec->lrh_len; lrt->lrt_index = rec->lrh_index; } @@ -264,10 +264,9 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, CERROR("argh, index %u already set in log bitmap?\n", index); LBUG(); /* should never happen */ } - llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); - llh->llh_tail.lrt_index = cpu_to_le32(index); + llh->llh_count++; + llh->llh_tail.lrt_index = index; - offset = 0; rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0); if (rc) RETURN(rc); @@ -277,21 +276,21 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, RETURN(rc); CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n", - loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len)); + loghandle->lgh_id.lgl_oid, index, rec->lrh_len); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; - if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC) + if (rec->lrh_type == MDS_UNLINK_REC) reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT; - else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC) + else if (rec->lrh_type == OST_SZ_REC) reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT; - else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC) + else if (rec->lrh_type == OST_RAID1_REC) reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT; else reccookie->lgc_subsys = -1; rc = 1; } - if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC) + if (rc == 0 && rec->lrh_type == LLOG_GEN_REC) rc = 1; RETURN(rc); @@ -366,7 +365,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx, } tail = buf + rc - sizeof(struct llog_rec_tail); - *cur_idx = le32_to_cpu(tail->lrt_index); + *cur_idx = tail->lrt_index; /* this shouldn't happen */ if (tail->lrt_index == 0) { @@ -375,15 +374,15 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx, loghandle->lgh_id.lgl_ogen, *cur_offset); RETURN(-EINVAL); } - if (le32_to_cpu(tail->lrt_index) < next_idx) + if (tail->lrt_index < next_idx) continue; /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ rec = buf; - if (le32_to_cpu(rec->lrh_index) > next_idx) { + if (rec->lrh_index > next_idx) { CERROR("missed desired record? %u > %u\n", - le32_to_cpu(rec->lrh_index), next_idx); + rec->lrh_index, next_idx); RETURN(-ENOENT); } RETURN(0); diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 446eb75..e9a9856 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -128,13 +128,13 @@ static int cat_cancel_cb(struct llog_handle *cathandle, int rc, index; ENTRY; - if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { + if (rec->lrh_type != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, - le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid); + rec->lrh_index, cathandle->lgh_id.lgl_oid); rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id); if (rc) { @@ -144,8 +144,8 @@ static int cat_cancel_cb(struct llog_handle *cathandle, } llh = loghandle->lgh_hdr; - if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) && - (le32_to_cpu(llh->llh_count) == 1)) { + if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && + (llh->llh_count == 1)) { rc = llog_destroy(loghandle); if (rc) CERROR("failure destroying log in postsetup: %d\n", rc); @@ -160,7 +160,7 @@ static int cat_cancel_cb(struct llog_handle *cathandle, if (rc == 0) CWARN("cancel log "LPX64":%x at index %u of catalog " LPX64"\n", lir->lid_id.lgl_oid, - lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index), + lir->lid_id.lgl_ogen, rec->lrh_index, cathandle->lgh_id.lgl_oid); } @@ -233,9 +233,9 @@ int llog_obd_origin_cleanup(struct llog_ctxt *ctxt) &cathandle->u.chd.chd_head, u.phd.phd_entry) { llh = loghandle->lgh_hdr; - if ((le32_to_cpu(llh->llh_flags) & + if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && - (le32_to_cpu(llh->llh_count) == 1)) { + (llh->llh_count == 1)) { rc = llog_destroy(loghandle); if (rc) CERROR("failure destroying log during " diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 997f1f5..f8e6de1 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -62,9 +62,9 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs) RETURN(-ERANGE); } - if (le32_to_cpu(llh->lgh_hdr->llh_count) != num_recs) { + if (llh->lgh_hdr->llh_count != num_recs) { CERROR("%s: handle->count is %d, expected %d after write\n", - test, le32_to_cpu(llh->lgh_hdr->llh_count), num_recs); + test, llh->lgh_hdr->llh_count, num_recs); RETURN(-ERANGE); } @@ -168,8 +168,8 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh) int num_recs = 1; /* 1 for the header */ ENTRY; - lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr)); - lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC); + lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = sizeof(lcr); + lcr.lcr_hdr.lrh_type = OST_SZ_REC; CWARN("3a: write one create_rec\n"); rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1); @@ -186,8 +186,8 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh) for (i = 0; i < 10; i++) { struct llog_rec_hdr hdr; char buf[8]; - hdr.lrh_len = cpu_to_le32(8); - hdr.lrh_type = cpu_to_le32(OBD_CFG_REC); + hdr.lrh_len = 8; + hdr.lrh_type = OBD_CFG_REC; memset(buf, 0, sizeof buf); rc = llog_write_rec(llh, &hdr, NULL, 0, buf, -1); if (rc) { @@ -237,9 +237,8 @@ static int llog_test_4(struct obd_device *obd) ENTRY; - lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = - cpu_to_le32(LLOG_MIN_REC_SIZE); - lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00); + lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE; + lmr.lmr_hdr.lrh_type = 0xf00f00; sprintf(name, "%x", llog_test_rand+1); CWARN("4a: create a catalog log with name: %s\n", name); @@ -294,8 +293,8 @@ static int llog_test_4(struct obd_device *obd) if (buf == NULL) GOTO(out, rc = -ENOMEM); for (i = 0; i < 5; i++) { - rec.lrh_len = cpu_to_le32(buflen); - rec.lrh_type = cpu_to_le32(OBD_CFG_REC); + rec.lrh_len = buflen; + rec.lrh_type = OBD_CFG_REC; rc = llog_cat_add_rec(cath, &rec, NULL, buf); if (rc) { CERROR("4e: write 5 records failed at #%d: %d\n", @@ -320,13 +319,13 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; - if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { + if (rec->lrh_type != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n", - le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid, + rec->lrh_index, lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid); RETURN(0); } @@ -334,13 +333,13 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, void *data) { - if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { + if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } CWARN("seeing record at index %d in log "LPX64"\n", - le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid); + rec->lrh_index, llh->lgh_id.lgl_oid); RETURN(0); } @@ -350,13 +349,13 @@ static int llog_cancel_rec_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, struct llog_cookie cookie; static int i = 0; - if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { + if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } cookie.lgc_lgl = llh->lgh_id; - cookie.lgc_index = le32_to_cpu(rec->lrh_index); + cookie.lgc_index = rec->lrh_index; llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie); i++; @@ -378,9 +377,8 @@ static int llog_test_5(struct obd_device *obd) ENTRY; - lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = - cpu_to_le32(LLOG_MIN_REC_SIZE); - lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00); + lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE; + lmr.lmr_hdr.lrh_type = 0xf00f00; CWARN("5a: re-open catalog by id\n"); rc = llog_create(ctxt, &llh, &cat_logid, NULL); diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index 2339f28..0a6acfe 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -144,8 +144,8 @@ static void cleanup_all_handles(void) struct portals_handle *h; h = list_entry(tmp, struct portals_handle, h_link); - CERROR("forcing cleanup for handle "LPX64"\n", - h->h_cookie); + CERROR("force clean handle "LPX64" addr %p addref %p\n", + h->h_cookie, h, h->h_addref); class_handle_unhash_nolock(h); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index e48332c..0e8e458 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1533,8 +1533,6 @@ static int filter_disconnect(struct obd_export *exp, int flags) exp->exp_flags = flags; spin_unlock_irqrestore(&exp->exp_lock, irqflags); - if (!(flags & OBD_OPT_FORCE)) - filter_grant_sanity_check(obd, __FUNCTION__); filter_grant_discard(exp); /* Disconnect early so that clients can't keep using export */ @@ -1542,8 +1540,6 @@ static int filter_disconnect(struct obd_export *exp, int flags) /* Do this twice in case a BRW arrived between the first call and * the class_export_unlink() call (bug 2663) */ - if (!(flags & OBD_OPT_FORCE)) - filter_grant_sanity_check(obd, __FUNCTION__); filter_grant_discard(exp); ldlm_cancel_locks_for_export(exp); @@ -2183,11 +2179,10 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, spin_unlock(&obd->obd_osfs_lock); CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64 - "pending "LPU64" free "LPU64" avail "LPU64"\n", - filter->fo_tot_dirty >> blockbits, - filter->fo_tot_granted >> blockbits, - filter->fo_tot_pending >> blockbits, - osfs->os_bfree, osfs->os_bavail); + " pending "LPU64" free "LPU64" avail "LPU64"\n", + filter->fo_tot_dirty, filter->fo_tot_granted, + filter->fo_tot_pending, + osfs->os_bfree << blockbits, osfs->os_bavail << blockbits); filter_grant_sanity_check(obd, __FUNCTION__); diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index 6edc2e1..6d3b80f 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -262,8 +262,10 @@ int osc_create(struct obd_export *exp, struct obdo *oa, rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc), &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); - if (rc == -ETIMEDOUT) + if (rc == -ETIMEDOUT) { + CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc); RETURN(rc); + } CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 11790b8..6858fe1 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -340,8 +340,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, LASSERT((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN); DEBUG_REQ(D_HA, request, - "delorphan from OST integration; level == RECOVER"); - request->rq_send_state = LUSTRE_IMP_RECOVER; + "delorphan from OST integration"); } rc = ptlrpc_queue_wait(request); @@ -2466,7 +2465,7 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, if (mode == LCK_PR) { rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, policy, LCK_PW, lockh); - if (rc == 1) { + if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) { /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ diff --git a/lustre/portals/portals/api-init.c b/lustre/portals/portals/api-init.c index b811391..e2921ac 100644 --- a/lustre/portals/portals/api-init.c +++ b/lustre/portals/portals/api-init.c @@ -29,7 +29,7 @@ int ptl_init; unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL | S_GMNAL | S_IBNAL); unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA | - D_RPCTRACE | D_VFSTRACE); + D_RPCTRACE | D_VFSTRACE | D_MALLOC); unsigned int portal_cerror = 1; unsigned int portal_printk; unsigned int portal_stack; diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 6ba3909..b1f8221 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -228,7 +228,6 @@ void request_in_callback(ptl_event_t *ev) list_add_tail(&req->rq_list, &service->srv_request_queue); service->srv_n_queued_reqs++; - rqbd->rqbd_eventcount++; /* NB everything can disappear under us once the request * has been queued and we unlock, so do the wake now... */ diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index eefae9c..dece441 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -331,7 +331,6 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request, } imp->imp_remote_handle = request->rq_repmsg->handle; IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL); - ptlrpc_pinger_add_import(imp); GOTO(finish, rc = 0); } @@ -499,11 +498,11 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp) imp->imp_target_uuid.uuid, imp->imp_connection->c_remote_uuid.uuid); - ptlrpc_validate_import(imp); rc = ptlrpc_resend(imp); if (rc) GOTO(out, rc); IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL); + ptlrpc_validate_import(imp); } if (imp->imp_state == LUSTRE_IMP_FULL) { diff --git a/lustre/ptlrpc/llog_client.c b/lustre/ptlrpc/llog_client.c index 8accba6..d34e5e2 100644 --- a/lustre/ptlrpc/llog_client.c +++ b/lustre/ptlrpc/llog_client.c @@ -194,7 +194,7 @@ static int llog_client_read_header(struct llog_handle *handle) GOTO(out, rc =-EFAULT); } memcpy(handle->lgh_hdr, hdr, sizeof (*hdr)); - handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index); + handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index; out: if (req) diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index cdd70e2..17be7dd 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -45,7 +45,8 @@ #ifdef __KERNEL__ int llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen) + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid) { struct llog_gen_rec *lgr; struct obd_import *imp; @@ -108,7 +109,7 @@ int llog_handle_connect(struct ptlrpc_request *req) ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx); rc = llog_connect(ctxt, 1, &req_body->lgdc_logid, - &req_body->lgdc_gen); + &req_body->lgdc_gen, NULL); if (rc != 0) CERROR("failed at llog_relp_connect\n"); diff --git a/lustre/ptlrpc/llog_server.c b/lustre/ptlrpc/llog_server.c index 681c982..4236519 100644 --- a/lustre/ptlrpc/llog_server.c +++ b/lustre/ptlrpc/llog_server.c @@ -380,7 +380,7 @@ static int llog_catinfo_cb(struct llog_handle *cat, } ctxt = cbd->ctxt; - if (!(cat->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) + if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT)) RETURN(-EINVAL); lir = (struct llog_logid_rec *)rec; diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index d1a92f6..29b4e36 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -520,7 +520,7 @@ int ptl_send_rpc(struct ptlrpc_request *request) return rc; } -void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) +int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) { struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni; struct ptlrpc_service *service = srv_ni->sni_service; @@ -528,18 +528,20 @@ void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) int rc; ptl_md_t md; ptl_handle_me_t me_h; - unsigned long flags; CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n", service->srv_req_portal, srv_ni->sni_ni->pni_name, srv_ni->sni_ni->pni_ni_h.nal_idx, srv_ni->sni_ni->pni_ni_h.cookie); + if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD)) + return (-ENOMEM); + rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal, match_id, 0, ~0, PTL_UNLINK, PTL_INS_AFTER, &me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); - GOTO (failed, NULL); + return (-ENOMEM); } LASSERT(rqbd->rqbd_refcount == 0); @@ -553,32 +555,15 @@ void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) md.user_ptr = &rqbd->rqbd_cbid; md.eventq = srv_ni->sni_ni->pni_eq_h; - spin_lock_irqsave (&service->srv_lock, flags); - srv_ni->sni_nrqbd_receiving++; - spin_unlock_irqrestore (&service->srv_lock, flags); - rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h); if (rc == PTL_OK) - return; - - CERROR("PtlMDAttach failed: %d\n", rc); + return (0); + + CERROR("PtlMDAttach failed: %d; \n", rc); LASSERT (rc == PTL_NOSPACE); rc = PtlMEUnlink (me_h); LASSERT (rc == PTL_OK); - - spin_lock_irqsave (&service->srv_lock, flags); - srv_ni->sni_nrqbd_receiving--; - if (srv_ni->sni_nrqbd_receiving == 0) { - /* This service is off-air on this interface because all - * its request buffers are busy. Portals will have started - * dropping incoming requests until more buffers get - * posted */ - CERROR("All %s %s request buffers busy\n", - service->srv_name, srv_ni->sni_ni->pni_name); - } - spin_unlock_irqrestore (&service->srv_lock, flags); - - failed: - LBUG(); /* BUG 1191 */ - /* put req on a retry list? */ + rqbd->rqbd_refcount = 0; + + return (-ENOMEM); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index b10f76d..d524021 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -808,13 +808,13 @@ void lustre_assert_wire_constants(void) (long long)LCK_EX); LASSERTF(LCK_PW == 2, " found %lld\n", (long long)LCK_PW); - LASSERTF(LCK_PR == 3, " found %lld\n", + LASSERTF(LCK_PR == 4, " found %lld\n", (long long)LCK_PR); - LASSERTF(LCK_CW == 4, " found %lld\n", + LASSERTF(LCK_CW == 8, " found %lld\n", (long long)LCK_CW); - LASSERTF(LCK_CR == 5, " found %lld\n", + LASSERTF(LCK_CR == 16, " found %lld\n", (long long)LCK_CR); - LASSERTF(LCK_NL == 6, " found %lld\n", + LASSERTF(LCK_NL == 32, " found %lld\n", (long long)LCK_NL); LASSERTF(PTLBD_QUERY == 200, " found %lld\n", (long long)PTLBD_QUERY); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index d28b0b6..3d66eae 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -143,6 +143,11 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, LASSERT(ctxt); + if (ctxt->loc_imp == NULL) { + CWARN("no import for ctxt %p\n", ctxt); + RETURN(0); + } + if (count == 0 || cookies == NULL) { down(&ctxt->loc_sem); if (ctxt->loc_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) @@ -561,7 +566,8 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) } int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen) + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid) { struct llog_canceld_ctxt *llcd; int rc; diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index e07cae9..2307d20 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -91,7 +91,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_srv_ni *srv_ni) } spin_lock_irqsave (&svc->srv_lock, flags); - list_add(&rqbd->rqbd_list, &srv_ni->sni_rqbds); + list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds); svc->srv_nbufs++; spin_unlock_irqrestore (&svc->srv_lock, flags); @@ -191,6 +191,56 @@ timeval_sub(struct timeval *large, struct timeval *small) (large->tv_usec - small->tv_usec); } +static int +ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc) +{ + struct ptlrpc_srv_ni *srv_ni; + struct ptlrpc_request_buffer_desc *rqbd; + unsigned long flags; + int rc; + + spin_lock_irqsave(&svc->srv_lock, flags); + if (list_empty (&svc->srv_idle_rqbds)) { + spin_unlock_irqrestore(&svc->srv_lock, flags); + return (0); + } + + rqbd = list_entry(svc->srv_idle_rqbds.next, + struct ptlrpc_request_buffer_desc, + rqbd_list); + list_del (&rqbd->rqbd_list); + + /* assume we will post successfully */ + srv_ni = rqbd->rqbd_srv_ni; + srv_ni->sni_nrqbd_receiving++; + list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds); + + spin_unlock_irqrestore(&svc->srv_lock, flags); + + rc = ptlrpc_register_rqbd(rqbd); + if (rc == 0) + return (1); + + spin_lock_irqsave(&svc->srv_lock, flags); + + srv_ni->sni_nrqbd_receiving--; + list_del(&rqbd->rqbd_list); + list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); + + if (srv_ni->sni_nrqbd_receiving == 0) { + /* This service is off-air on this interface because all + * its request buffers are busy. Portals will have started + * dropping incoming requests until more buffers get + * posted */ + CERROR("All %s %s request buffers busy\n", + svc->srv_name, srv_ni->sni_ni->pni_name); + } + + spin_unlock_irqrestore (&svc->srv_lock, flags); + + return (-1); +} + struct ptlrpc_service * ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int req_portal, int rep_portal, @@ -227,6 +277,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, service->srv_handler = handler; INIT_LIST_HEAD(&service->srv_request_queue); + INIT_LIST_HEAD(&service->srv_idle_rqbds); INIT_LIST_HEAD(&service->srv_reply_queue); /* First initialise enough for early teardown */ @@ -235,7 +286,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, srv_ni->sni_service = service; srv_ni->sni_ni = &ptlrpc_interfaces[i]; - INIT_LIST_HEAD(&srv_ni->sni_rqbds); + INIT_LIST_HEAD(&srv_ni->sni_active_rqbds); INIT_LIST_HEAD(&srv_ni->sni_active_replies); } @@ -259,7 +310,12 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, srv_ni->sni_ni->pni_name); GOTO(failed, NULL); } - ptlrpc_register_rqbd (rqbd); + + /* We shouldn't be under memory pressure at + * startup, so fail if we can't post all our + * buffers at this time. */ + if (ptlrpc_server_post_idle_rqbds(service) <= 0) + GOTO(failed, NULL); } } @@ -275,6 +331,26 @@ failed: return NULL; } +static void +ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req) +{ + unsigned long flags; + int refcount; + + spin_lock_irqsave(&svc->srv_lock, flags); + svc->srv_n_active_reqs--; + refcount = --(req->rq_rqbd->rqbd_refcount); + if (refcount == 0) { + /* request buffer is now idle */ + list_del(&req->rq_rqbd->rqbd_list); + list_add_tail(&req->rq_rqbd->rqbd_list, + &svc->srv_idle_rqbds); + } + spin_unlock_irqrestore(&svc->srv_lock, flags); + + ptlrpc_free_server_req(req); +} + static int ptlrpc_server_handle_request (struct ptlrpc_service *svc) { @@ -283,7 +359,6 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc) struct timeval work_start; struct timeval work_end; long timediff; - int refcount; int rc; ENTRY; @@ -413,18 +488,8 @@ put_conn: } } - spin_lock_irqsave(&svc->srv_lock, flags); - svc->srv_n_active_reqs--; - refcount = --(request->rq_rqbd->rqbd_refcount); - spin_unlock_irqrestore(&svc->srv_lock, flags); - - if (refcount == 0) { - /* rqbd now idle: repost */ - ptlrpc_register_rqbd(request->rq_rqbd); - } - - ptlrpc_free_server_req(request); - + ptlrpc_server_free_request(svc, request); + RETURN(1); } @@ -529,6 +594,7 @@ int liblustre_check_services (void *arg) { int did_something = 0; + int rc; struct list_head *tmp, *nxt; ENTRY; @@ -548,12 +614,13 @@ liblustre_check_services (void *arg) svc->srv_nthreads++; - while (ptlrpc_server_handle_reply (svc)) - did_something++; - - while (ptlrpc_server_handle_request (svc)) - did_something++; - + do { + rc = ptlrpc_server_handle_reply(svc); + rc |= ptlrpc_server_handle_request(svc); + rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0); + did_something |= rc; + } while (rc); + svc->srv_nthreads--; } @@ -571,6 +638,15 @@ void ptlrpc_daemonize(void) reparent_to_init(); } +static int +ptlrpc_retry_rqbds(void *arg) +{ + struct ptlrpc_service *svc = (struct ptlrpc_service *)arg; + + svc->srv_rqbd_timeout = 0; + return (-ETIMEDOUT); +} + static int ptlrpc_main(void *arg) { struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg; @@ -603,10 +679,14 @@ static int ptlrpc_main(void *arg) while ((thread->t_flags & SVC_STOPPING) == 0 || svc->srv_n_difficult_replies != 0) { /* Don't exit while there are replies to be handled */ - struct l_wait_info lwi = { 0 }; - + struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout, + ptlrpc_retry_rqbds, svc); + l_wait_event_exclusive (svc->srv_waitq, - (thread->t_flags & SVC_STOPPING) != 0 || + ((thread->t_flags & SVC_STOPPING) != 0 && + svc->srv_n_difficult_replies == 0) || + (!list_empty(&svc->srv_idle_rqbds) && + svc->srv_rqbd_timeout == 0) || !list_empty (&svc->srv_reply_queue) || (!list_empty (&svc->srv_request_queue) && (svc->srv_n_difficult_replies == 0 || @@ -624,6 +704,14 @@ static int ptlrpc_main(void *arg) (svc->srv_n_difficult_replies == 0 || svc->srv_n_active_reqs < (svc->srv_nthreads - 1))) ptlrpc_server_handle_request (svc); + + if (!list_empty(&svc->srv_idle_rqbds) && + ptlrpc_server_post_idle_rqbds(svc) < 0) { + /* I just failed to repost request buffers. Wait + * for a timeout (unless something else happens) + * before I try again */ + svc->srv_rqbd_timeout = HZ/10; + } } spin_lock_irqsave(&svc->srv_lock, flags); @@ -756,8 +844,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_name, srv_ni->sni_ni->pni_name); /* Unlink all the request buffers. This forces a 'final' - * event with its 'unlink' flag set for each rqbd */ - list_for_each(tmp, &srv_ni->sni_rqbds) { + * event with its 'unlink' flag set for each posted rqbd */ + list_for_each(tmp, &srv_ni->sni_active_rqbds) { struct ptlrpc_request_buffer_desc *rqbd = list_entry(tmp, struct ptlrpc_request_buffer_desc, rqbd_list); @@ -812,25 +900,27 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) list_del(&req->rq_list); service->srv_n_queued_reqs--; - req->rq_rqbd->rqbd_refcount--; - - ptlrpc_free_server_req(req); + service->srv_n_active_reqs++; + + ptlrpc_server_free_request(service, req); } LASSERT(service->srv_n_queued_reqs == 0); + LASSERT(service->srv_n_active_reqs == 0); - /* Now free all the request buffers since nothing references them - * any more... */ for (i = 0; i < ptlrpc_ninterfaces; i++) { srv_ni = &service->srv_interfaces[i]; + LASSERT(list_empty(&srv_ni->sni_active_rqbds)); + } - while (!list_empty(&srv_ni->sni_rqbds)) { - struct ptlrpc_request_buffer_desc *rqbd = - list_entry(srv_ni->sni_rqbds.next, - struct ptlrpc_request_buffer_desc, - rqbd_list); + /* Now free all the request buffers since nothing references them + * any more... */ + while (!list_empty(&service->srv_idle_rqbds)) { + struct ptlrpc_request_buffer_desc *rqbd = + list_entry(service->srv_idle_rqbds.next, + struct ptlrpc_request_buffer_desc, + rqbd_list); - ptlrpc_free_rqbd(rqbd); - } + ptlrpc_free_rqbd(rqbd); } /* wait for all outstanding replies to complete (they were diff --git a/lustre/scripts/lustre.spec.in b/lustre/scripts/lustre.spec.in index 20d04c8..eea4316 100644 --- a/lustre/scripts/lustre.spec.in +++ b/lustre/scripts/lustre.spec.in @@ -1,5 +1,5 @@ # lustre.spec -%define version v1_2_0pre5 +%define version b_smallfix8 %define kversion @LINUXRELEASE@ %define linuxdir @LINUX@ %define enable_doc @ENABLE_DOC@ diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh index f6395a4..8e10631 100755 --- a/lustre/tests/replay-dual.sh +++ b/lustre/tests/replay-dual.sh @@ -175,7 +175,6 @@ test_6() { } run_test 6 "open1, open2, unlink |X| close1 [fail mds] close2" - if [ "$ONLY" != "setup" ]; then equals_msg test complete, cleaning up cleanup diff --git a/lustre/tests/replay-ost-single.sh b/lustre/tests/replay-ost-single.sh index 0861045..bd109b9 100755 --- a/lustre/tests/replay-ost-single.sh +++ b/lustre/tests/replay-ost-single.sh @@ -47,23 +47,28 @@ fi build_test_filter -rm -f ostactive +SETUP=${SETUP:-"setup"} +CLEANUP=${CLEANUP:-"cleanup"} -gen_config +setup() { + gen_config -start ost --reformat $OSTLCONFARGS - -[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE -start mds --reformat $MDSLCONFARGS -zconf_mount `hostname` $MOUNT + start ost --reformat $OSTLCONFARGS + [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE + start mds --reformat $MDSLCONFARGS + zconf_mount `hostname` $MOUNT +} mkdir -p $DIR +$SETUP + test_0() { fail ost cp /etc/profile $DIR/$tfile sync diff /etc/profile $DIR/$tfile + rm -f $DIR/$tfile } run_test 0 "empty replay" @@ -71,6 +76,7 @@ test_1() { date > $DIR/$tfile fail ost $CHECKSTAT -t file $DIR/$tfile || return 1 + rm -f $DIR/$tfile } run_test 1 "touch" @@ -82,6 +88,7 @@ test_2() { for i in `seq 10`; do grep -q "tag-$i" $DIR/$tfile-$i || error "f2-$i" done + rm -f $DIR/$tfile-* } run_test 2 "|x| 10 open(O_CREAT)s" @@ -120,8 +127,56 @@ test_5() { sleep 10 fail ost wait $PID || return 1 + rm -f $DIR/$tfile } run_test 5 "Fail OST during iozone" +kbytesfree() { + cat /proc/fs/lustre/osc/OSC_*MNT*/kbytesfree | awk '{total+=$1} END {print total}' +} + +test_6() { + f=$DIR/$tfile + before=`kbytesfree` + dd if=/dev/urandom bs=1024 count=5120 of=$f +#define OBD_FAIL_MDS_REINT_NET_REP 0x119 + do_facet mds "sysctl -w lustre.fail_loc=0x80000119" + sync + after_dd=`kbytesfree` + echo "before: $before after_dd: $after_dd" + (( before > after_dd )) || return 1 + rm -f $f + fail ost + $CHECKSTAT -t file $f && return 2 || true + sync + # let the delete happen + sleep 2 + after=`kbytesfree` + echo "before: $before after: $after" + (( before == after )) || return 3 +} +run_test 6 "Fail OST before obd_destroy" + +test_7() { + f=$DIR/$tfile + before=`kbytesfree` + dd if=/dev/urandom bs=1024 count=5120 of=$f + sync + after_dd=`kbytesfree` + echo "before: $before after_dd: $after_dd" + (( before > after_dd )) || return 1 + replay_barrier ost + rm -f $f + fail ost + $CHECKSTAT -t file $f && return 2 || true + sync + # let the delete happen + sleep 2 + after=`kbytesfree` + echo "before: $before after: $after" + (( before == after )) || return 3 +} +run_test 7 "Fail OST before obd_destroy" + equals_msg test complete, cleaning up -cleanup +$CLEANUP diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 825a849..67595fc 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -797,6 +797,7 @@ test_40(){ } run_test 40 "cause recovery in ptlrpc, ensure IO continues" + #b=2814 # make sure that a read to one osc doesn't try to double-unlock its page just # because another osc is invalid. trigger_group_io used to mistakenly return @@ -804,22 +805,39 @@ run_test 40 "cause recovery in ptlrpc, ensure IO continues" # on valid oscs. This was fatal if the caller was ll_readpage who unlocked # the page, guarnateeing that the unlock from the RPC completion would # assert on trying to unlock the unlocked page. -test_41(){ - local f=$MOUNT/t42 +test_41() { + local f=$MOUNT/$tfile # make sure the start of the file is ost1 lfs setstripe $f $((128 * 1024)) 0 0 do_facet client dd if=/dev/zero of=$f bs=4k count=1 || return 3 cancel_lru_locks OSC # fail ost2 and read from ost1 - local osc2_dev=`../utils/lctl device_list | \ + local osc2_dev=`$LCTL device_list | \ awk '(/ost2.*client_facet/){print $4}' ` - lctl --device "\$"$osc2_dev deactivate + $LCTL --device %$osc2_dev deactivate do_facet client dd if=$f of=/dev/null bs=4k count=1 || return 3 - lctl --device "\$"$osc2_dev activate + $LCTL --device %$osc2_dev activate return 0 } run_test 41 "read from a valid osc while other oscs are invalid" +# test MDS recovery after ost failure +test_42() { + createmany -o $DIR/$tfile-%d 800 + replay_barrier ost + unlinkmany $DIR/$tfile-%d 0 400 + facet_failover ost + + # osc is evicted after + df $MOUNT && return 1 + df $MOUNT || return 2 + echo wait for MDS to timeout and recover + sleep $((TIMEOUT * 2)) + unlinkmany $DIR/$tfile-%d 400 400 + $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true +} +run_test 42 "recoery after ost failure" + equals_msg test complete, cleaning up $CLEANUP diff --git a/lustre/tests/runslabinfo b/lustre/tests/runslabinfo index eba407d..070a186 100755 --- a/lustre/tests/runslabinfo +++ b/lustre/tests/runslabinfo @@ -2,4 +2,5 @@ while sleep 1 ; do echo '-----------------------' egrep "ll_|ldlm|filp|dentry|inode|portals|size-[0-9]* " /proc/slabinfo + cat /proc/meminfo done diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 9b57fd2..84b645a 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -772,6 +772,19 @@ test_27j() { } run_test 27j "lstripe with bad stripe offset (should return error)" +test_27k() { # bug 2844 + FILE=$DIR/d27/f27k + LL_MAX_BLKSIZE=$((4 * 1024 * 1024)) + [ ! -d $DIR/d27 ] && mkdir -p $DIR/d27 + $LSTRIPE $FILE 67108864 -1 0 || error "lstripe failed" + BLKSIZE=`stat $FILE | awk '/IO Block:/ { print $7 }'` + [ $BLKSIZE -le $LL_MAX_BLKSIZE ] || error "$BLKSIZE > $LL_MAX_BLKSIZE" + dd if=/dev/zero of=$FILE bs=4k count=1 + BLKSIZE=`stat $FILE | awk '/IO Block:/ { print $7 }'` + [ $BLKSIZE -le $LL_MAX_BLKSIZE ] || error "$BLKSIZE > $LL_MAX_BLKSIZE" +} +run_test 27k "limit i_blksize for broken user apps =============" + test_28() { mkdir $DIR/d28 $CREATETEST $DIR/d28/ct || error diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 7a333af..b15e2bf 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -3,8 +3,8 @@ set -e ONLY=${ONLY:-"$*"} -# bug number for skipped test: 1557 -ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"8"} +# bug number for skipped test: 1768 1557 +ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"4 8 14b"} # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! [ "$ALWAYS_EXCEPT$EXCEPT" ] && echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT" diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index 4b054d5..b493c9c 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -454,6 +454,7 @@ cancel_lru_locks() { # Test interface error() { echo "${TESTSUITE}: **** FAIL:" $@ + log "FAIL: $@" exit 1 } @@ -514,6 +515,11 @@ equals_msg() { printf '===== %s %.*s\n' "$msg" $suffixlen $EQUALS } +log() { + echo "$*" + lctl mark "$*" 2> /dev/null || true +} + run_one() { testnum=$1 message=$2 @@ -523,6 +529,7 @@ run_one() { # Pretty tests run faster. equals_msg $testnum: $message + log "== test $1: $2" test_${testnum} || error "test_$testnum failed with $?" } diff --git a/lustre/utils/llmount.c b/lustre/utils/llmount.c index 4143d02..ce28e09 100644 --- a/lustre/utils/llmount.c +++ b/lustre/utils/llmount.c @@ -179,31 +179,25 @@ set_local(struct lustre_mount_data *lmd) return rc; } } else if (lmd->lmd_nal == QSWNAL) { -#if MULTIRAIL_EKC char *pfiles[] = {"/proc/qsnet/elan3/device0/position", "/proc/qsnet/elan4/device0/position", + "/proc/elan/device0/position", NULL}; -#else - char *pfiles[] = {"/proc/elan/device0/position", - NULL}; -#endif int i = 0; do { rc = get_local_elan_id(pfiles[i], buf); - } while (rc != 0 && - pfiles[++i] != NULL); + } while (rc != 0 && pfiles[++i] != NULL); if (rc != 0) { - fprintf(stderr, "mount: can't read elan ID" - " from /proc\n"); + fprintf(stderr, + "mount: can't read Elan ID from /proc\n"); return -1; } } if (ptl_parse_nid (&nid, buf) != 0) { - fprintf (stderr, "mount: can't parse NID %s\n", - buf); + fprintf (stderr, "mount: can't parse NID %s\n", buf); return (-1); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index f28a235..45a5a2b 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -168,13 +168,13 @@ void lustre_assert_wire_constants(void) (long long)LCK_EX); LASSERTF(LCK_PW == 2, " found %lld\n", (long long)LCK_PW); - LASSERTF(LCK_PR == 3, " found %lld\n", + LASSERTF(LCK_PR == 4, " found %lld\n", (long long)LCK_PR); - LASSERTF(LCK_CW == 4, " found %lld\n", + LASSERTF(LCK_CW == 8, " found %lld\n", (long long)LCK_CW); - LASSERTF(LCK_CR == 5, " found %lld\n", + LASSERTF(LCK_CR == 16, " found %lld\n", (long long)LCK_CR); - LASSERTF(LCK_NL == 6, " found %lld\n", + LASSERTF(LCK_NL == 32, " found %lld\n", (long long)LCK_NL); LASSERTF(PTLBD_QUERY == 200, " found %lld\n", (long long)PTLBD_QUERY);