X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=7f96185703148aa1be75936f71e306044ea694dd;hp=14de931d5c24502a5d82c0dda2a5592b9dca7839;hb=e3d507eec50fc1ff79acf2a9f93d52d698c887d7;hpb=21f138ef4f2c95794e409304609ca39ea56aa1ae diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 14de931..7f96185 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,13 +42,8 @@ #define DEBUG_SUBSYSTEM S_FID -#ifdef __KERNEL__ -# include -# include -#else /* __KERNEL__ */ -# include -#endif - +#include +#include #include #include #include @@ -81,7 +76,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, /* Zero out input range, this is not recovery yet. */ in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); - range_init(in); + lu_seq_range_init(in); ptlrpc_request_set_replen(req); @@ -115,28 +110,26 @@ static int seq_client_rpc(struct lu_client_seq *seq, debug_mask = D_INFO; } - ptlrpc_at_set_req_timeout(req); + /* Allow seq client RPC during recovery time. */ + req->rq_allow_replay = 1; - if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA) - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + ptlrpc_at_set_req_timeout(req); rc = ptlrpc_queue_wait(req); - if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA) - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc) GOTO(out_req, rc); out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); *output = *out; - if (!range_is_sane(output)) { + if (!lu_seq_range_is_sane(output)) { CERROR("%s: Invalid range received from server: " DRANGE"\n", seq->lcs_name, PRANGE(output)); GOTO(out_req, rc = -EINVAL); } - if (range_is_exhausted(output)) { + if (lu_seq_range_is_exhausted(output)) { CERROR("%s: Range received from server is exhausted: " DRANGE"]\n", seq->lcs_name, PRANGE(output)); GOTO(out_req, rc = -EINVAL); @@ -213,14 +206,14 @@ static int seq_client_alloc_meta(const struct lu_env *env, /* Allocate new sequence for client. */ static int seq_client_alloc_seq(const struct lu_env *env, - struct lu_client_seq *seq, seqno_t *seqnr) + struct lu_client_seq *seq, u64 *seqnr) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(range_is_sane(&seq->lcs_space)); + LASSERT(lu_seq_range_is_sane(&seq->lcs_space)); - if (range_is_exhausted(&seq->lcs_space)) { + if (lu_seq_range_is_exhausted(&seq->lcs_space)) { rc = seq_client_alloc_meta(env, seq); if (rc) { CERROR("%s: Can't allocate new meta-sequence," @@ -234,9 +227,9 @@ static int seq_client_alloc_seq(const struct lu_env *env, rc = 0; } - LASSERT(!range_is_exhausted(&seq->lcs_space)); - *seqnr = seq->lcs_space.lsr_start; - seq->lcs_space.lsr_start += 1; + LASSERT(!lu_seq_range_is_exhausted(&seq->lcs_space)); + *seqnr = seq->lcs_space.lsr_start; + seq->lcs_space.lsr_start += 1; CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, *seqnr); @@ -252,78 +245,96 @@ static int seq_fid_alloc_prep(struct lu_client_seq *seq, set_current_state(TASK_UNINTERRUPTIBLE); mutex_unlock(&seq->lcs_mutex); - waitq_wait(link, TASK_UNINTERRUPTIBLE); + schedule(); mutex_lock(&seq->lcs_mutex); remove_wait_queue(&seq->lcs_waitq, link); set_current_state(TASK_RUNNING); return -EAGAIN; } + ++seq->lcs_update; mutex_unlock(&seq->lcs_mutex); + return 0; } -static void seq_fid_alloc_fini(struct lu_client_seq *seq) +static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr, + bool whole) { LASSERT(seq->lcs_update == 1); + mutex_lock(&seq->lcs_mutex); + if (seqnr != 0) { + CDEBUG(D_INFO, "%s: New sequence [0x%16.16"LPF64"x]\n", + seq->lcs_name, seqnr); + + seq->lcs_fid.f_seq = seqnr; + if (whole) { + /* Since the caller require the whole seq, + * so marked this seq to be used */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = + LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + } else { + seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; + } + seq->lcs_fid.f_ver = 0; + } + --seq->lcs_update; - wake_up(&seq->lcs_waitq); + wake_up_all(&seq->lcs_waitq); } /** - * Allocate the whole seq to the caller. - **/ + * Allocate the whole non-used seq to the caller. + * + * \param[in] env pointer to the thread context + * \param[in,out] seq pointer to the client sequence manager + * \param[out] seqnr to hold the new allocated sequence + * + * \retval 0 for new sequence allocated. + * \retval Negative error number on failure. + */ int seq_client_get_seq(const struct lu_env *env, - struct lu_client_seq *seq, seqno_t *seqnr) + struct lu_client_seq *seq, u64 *seqnr) { wait_queue_t link; int rc; LASSERT(seqnr != NULL); - mutex_lock(&seq->lcs_mutex); - init_waitqueue_entry_current(&link); - while (1) { - rc = seq_fid_alloc_prep(seq, &link); - if (rc == 0) - break; - } - - rc = seq_client_alloc_seq(env, seq, seqnr); - if (rc) { - CERROR("%s: Can't allocate new sequence, " - "rc %d\n", seq->lcs_name, rc); - seq_fid_alloc_fini(seq); - mutex_unlock(&seq->lcs_mutex); - return rc; - } - - CDEBUG(D_INFO, "%s: allocate sequence " - "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); + mutex_lock(&seq->lcs_mutex); + init_waitqueue_entry(&link, current); - /* Since the caller require the whole seq, - * so marked this seq to be used */ - if (seq->lcs_type == LUSTRE_SEQ_METADATA) - seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; - else - seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + /* To guarantee that we can get a whole non-used sequence. */ + while (seq_fid_alloc_prep(seq, &link) != 0); - seq->lcs_fid.f_seq = *seqnr; - seq->lcs_fid.f_ver = 0; - /* - * Inform caller that sequence switch is performed to allow it - * to setup FLD for it. - */ - seq_fid_alloc_fini(seq); + rc = seq_client_alloc_seq(env, seq, seqnr); + seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true); + if (rc) + CERROR("%s: Can't allocate new sequence: rc = %d\n", + seq->lcs_name, rc); mutex_unlock(&seq->lcs_mutex); - return rc; + return rc; } EXPORT_SYMBOL(seq_client_get_seq); -/* Allocate new fid on passed client @seq and save it to @fid. */ +/** + * Allocate new fid on passed client @seq and save it to @fid. + * + * \param[in] env pointer to the thread context + * \param[in,out] seq pointer to the client sequence manager + * \param[out] fid to hold the new allocated fid + * + * \retval 1 for notify the caller that sequence switch + * is performed to allow it to setup FLD for it. + * \retval 0 for new FID allocated in current sequence. + * \retval Negative error number on failure. + */ int seq_client_alloc_fid(const struct lu_env *env, struct lu_client_seq *seq, struct lu_fid *fid) { @@ -334,58 +345,50 @@ int seq_client_alloc_fid(const struct lu_env *env, LASSERT(seq != NULL); LASSERT(fid != NULL); - init_waitqueue_entry_current(&link); + init_waitqueue_entry(&link, current); mutex_lock(&seq->lcs_mutex); if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) seq->lcs_fid.f_oid = seq->lcs_width; - while (1) { - seqno_t seqnr; + while (1) { + u64 seqnr; - if (!fid_is_zero(&seq->lcs_fid) && - fid_oid(&seq->lcs_fid) < seq->lcs_width) { - /* Just bump last allocated fid and return to caller. */ - seq->lcs_fid.f_oid += 1; - rc = 0; - break; - } - - rc = seq_fid_alloc_prep(seq, &link); - if (rc) - continue; + if (unlikely(!fid_is_zero(&seq->lcs_fid) && + fid_oid(&seq->lcs_fid) < seq->lcs_width)) { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid++; + rc = 0; + break; + } - rc = seq_client_alloc_seq(env, seq, &seqnr); - if (rc) { - CERROR("%s: Can't allocate new sequence, " - "rc %d\n", seq->lcs_name, rc); - seq_fid_alloc_fini(seq); + /* Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid + * deadlock during seq_client_alloc_seq(). */ + rc = seq_fid_alloc_prep(seq, &link); + if (rc) + continue; + + rc = seq_client_alloc_seq(env, seq, &seqnr); + /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */ + seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false); + if (rc) { + CERROR("%s: Can't allocate new sequence: rc = %d\n", + seq->lcs_name, rc); mutex_unlock(&seq->lcs_mutex); - RETURN(rc); - } - CDEBUG(D_INFO, "%s: Switch to sequence " - "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr); - - seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; - seq->lcs_fid.f_seq = seqnr; - seq->lcs_fid.f_ver = 0; - - /* - * Inform caller that sequence switch is performed to allow it - * to setup FLD for it. - */ - rc = 1; + RETURN(rc); + } - seq_fid_alloc_fini(seq); - break; - } + rc = 1; + break; + } - *fid = seq->lcs_fid; + *fid = seq->lcs_fid; mutex_unlock(&seq->lcs_mutex); - CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); - RETURN(rc); + CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); + + RETURN(rc); } EXPORT_SYMBOL(seq_client_alloc_fid); @@ -398,7 +401,7 @@ void seq_client_flush(struct lu_client_seq *seq) wait_queue_t link; LASSERT(seq != NULL); - init_waitqueue_entry_current(&link); + init_waitqueue_entry(&link, current); mutex_lock(&seq->lcs_mutex); while (seq->lcs_update) { @@ -406,7 +409,7 @@ void seq_client_flush(struct lu_client_seq *seq) set_current_state(TASK_UNINTERRUPTIBLE); mutex_unlock(&seq->lcs_mutex); - waitq_wait(&link, TASK_UNINTERRUPTIBLE); + schedule(); mutex_lock(&seq->lcs_mutex); remove_wait_queue(&seq->lcs_waitq, &link); @@ -421,14 +424,14 @@ void seq_client_flush(struct lu_client_seq *seq) seq->lcs_space.lsr_index = -1; - range_init(&seq->lcs_space); + lu_seq_range_init(&seq->lcs_space); mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); static void seq_client_proc_fini(struct lu_client_seq *seq) { -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS ENTRY; if (seq->lcs_proc_dir) { if (!IS_ERR(seq->lcs_proc_dir)) @@ -436,18 +439,17 @@ static void seq_client_proc_fini(struct lu_client_seq *seq) seq->lcs_proc_dir = NULL; } EXIT; -#endif /* LPROCFS */ +#endif /* CONFIG_PROC_FS */ } static int seq_client_proc_init(struct lu_client_seq *seq) { -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS int rc; ENTRY; - seq->lcs_proc_dir = lprocfs_seq_register(seq->lcs_name, - seq_type_proc_dir, - NULL, NULL); + seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, seq_type_proc_dir, + NULL, NULL); if (IS_ERR(seq->lcs_proc_dir)) { CERROR("%s: LProcFS failed in seq-init\n", seq->lcs_name); @@ -455,8 +457,7 @@ static int seq_client_proc_init(struct lu_client_seq *seq) RETURN(rc); } - rc = lprocfs_seq_add_vars(seq->lcs_proc_dir, - seq_client_proc_list, seq); + rc = lprocfs_add_vars(seq->lcs_proc_dir, seq_client_proc_list, seq); if (rc) { CERROR("%s: Can't init sequence manager " "proc, rc %d\n", seq->lcs_name, rc); @@ -469,9 +470,9 @@ out_cleanup: seq_client_proc_fini(seq); return rc; -#else /* LPROCFS */ +#else /* !CONFIG_PROC_FS */ return 0; -#endif +#endif /* CONFIG_PROC_FS */ } int seq_client_init(struct lu_client_seq *seq, @@ -575,14 +576,13 @@ int client_fid_fini(struct obd_device *obd) } EXPORT_SYMBOL(client_fid_fini); -#ifdef __KERNEL__ struct proc_dir_entry *seq_type_proc_dir; -static int __init fid_mod_init(void) +static int __init fid_init(void) { - seq_type_proc_dir = lprocfs_seq_register(LUSTRE_SEQ_NAME, - proc_lustre_root, - NULL, NULL); + seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, + proc_lustre_root, + NULL, NULL); if (IS_ERR(seq_type_proc_dir)) return PTR_ERR(seq_type_proc_dir); @@ -593,7 +593,7 @@ static int __init fid_mod_init(void) return 0; } -static void __exit fid_mod_exit(void) +static void __exit fid_exit(void) { # ifdef HAVE_SERVER_SUPPORT fid_server_mod_exit(); @@ -605,9 +605,10 @@ static void __exit fid_mod_exit(void) } } -MODULE_AUTHOR("Sun Microsystems, Inc. "); -MODULE_DESCRIPTION("Lustre FID Module"); +MODULE_AUTHOR("OpenSFS, Inc. "); +MODULE_DESCRIPTION("Lustre File IDentifier"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit); -#endif /* __KERNEL__ */ +module_init(fid_init); +module_exit(fid_exit);