X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=a8ca08733fd78933ad274e96d3feaa1108686971;hp=bc0f46bee2b6346b7953b0eabc505cd1ce049c9b;hb=7404a3e355eeed661c6ee669c05d9e6a24ed592f;hpb=7cbd1b0c7409b4bfb44dc6869afc4ff9c90a7dce;ds=sidebyside diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index bc0f46b..a8ca087 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,24 +38,18 @@ #define DEBUG_SUBSYSTEM S_FID -#ifdef __KERNEL__ -# include -# include -#else /* __KERNEL__ */ -# include -#endif - +#include +#include #include #include -#include -#include #include -#include #include /* mdc RPC locks */ #include #include "fid_internal.h" +struct dentry *seq_debugfs_dir; + static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *output, __u32 opc, const char *opcname) @@ -72,6 +62,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, int rc; ENTRY; + LASSERT(exp != NULL && !IS_ERR(exp)); req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, LUSTRE_MDS_VERSION, SEQ_QUERY); if (req == NULL) @@ -83,37 +74,46 @@ static int seq_client_rpc(struct lu_client_seq *seq, /* Zero out input range, this is not recovery yet. */ in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); - range_init(in); + lu_seq_range_init(in); ptlrpc_request_set_replen(req); - if (seq->lcs_type == LUSTRE_SEQ_METADATA) { - req->rq_request_portal = SEQ_METADATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_MDT; - } else { - LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA, - "unknown lcs_type %u\n", seq->lcs_type); - req->rq_request_portal = SEQ_DATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_OST; - } + in->lsr_index = seq->lcs_space.lsr_index; + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + fld_range_set_mdt(in); + else + fld_range_set_ost(in); if (opc == SEQ_ALLOC_SUPER) { - /* Update index field of *in, it is required for - * FLD update on super sequence allocator node. */ - in->lsr_index = seq->lcs_space.lsr_index; req->rq_request_portal = SEQ_CONTROLLER_PORTAL; + req->rq_reply_portal = MDC_REPLY_PORTAL; + /* During allocating super sequence for data object, + * the current thread might hold the export of MDT0(MDT0 + * precreating objects on this OST), and it will send the + * request to MDT0 here, so we can not keep resending the + * request here, otherwise if MDT0 is failed(umounted), + * it can not release the export of MDT0 */ + if (seq->lcs_type == LUSTRE_SEQ_DATA) + req->rq_no_delay = req->rq_no_resend = 1; debug_mask = D_CONSOLE; } else { + if (seq->lcs_type == LUSTRE_SEQ_METADATA) { + req->rq_reply_portal = MDC_REPLY_PORTAL; + req->rq_request_portal = SEQ_METADATA_PORTAL; + } else { + req->rq_reply_portal = OSC_REPLY_PORTAL; + req->rq_request_portal = SEQ_DATA_PORTAL; + } + debug_mask = D_INFO; - LASSERTF(opc == SEQ_ALLOC_META, - "unknown opcode %u\n, opc", opc); } + /* Allow seq client RPC during recovery time. */ + req->rq_allow_replay = 1; + ptlrpc_at_set_req_timeout(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc) GOTO(out_req, rc); @@ -121,13 +121,13 @@ static int seq_client_rpc(struct lu_client_seq *seq, out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); *output = *out; - if (!range_is_sane(output)) { + if (!lu_seq_range_is_sane(output)) { CERROR("%s: Invalid range received from server: " DRANGE"\n", seq->lcs_name, PRANGE(output)); GOTO(out_req, rc = -EINVAL); } - if (range_is_exhausted(output)) { + if (lu_seq_range_is_exhausted(output)) { CERROR("%s: Range received from server is exhausted: " DRANGE"]\n", seq->lcs_name, PRANGE(output)); GOTO(out_req, rc = -EINVAL); @@ -149,21 +149,28 @@ int seq_client_alloc_super(struct lu_client_seq *seq, int rc; ENTRY; - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); -#ifdef __KERNEL__ if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER LASSERT(env != NULL); rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space, env); - } else { +#else + rc = 0; #endif - rc = seq_client_rpc(seq, &seq->lcs_space, + } else { + /* Check whether the connection to seq controller has been + * setup (lcs_exp != NULL) */ + if (seq->lcs_exp == NULL) { + mutex_unlock(&seq->lcs_mutex); + RETURN(-EINPROGRESS); + } + + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_SUPER, "super"); -#ifdef __KERNEL__ } -#endif - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -171,37 +178,56 @@ int seq_client_alloc_super(struct lu_client_seq *seq, static int seq_client_alloc_meta(const struct lu_env *env, struct lu_client_seq *seq) { - int rc; - ENTRY; + int rc; + ENTRY; -#ifdef __KERNEL__ - if (seq->lcs_srv) { - LASSERT(env != NULL); - rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); - } else { -#endif - rc = seq_client_rpc(seq, &seq->lcs_space, - SEQ_ALLOC_META, "meta"); -#ifdef __KERNEL__ - } + if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER + LASSERT(env); + rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); +#else + rc = 0; #endif - RETURN(rc); + } else { + do { + /* If meta server return -EINPROGRESS or EAGAIN, + * it means meta server might not be ready to + * allocate super sequence from sequence controller + * (MDT0)yet */ + rc = seq_client_rpc(seq, &seq->lcs_space, + SEQ_ALLOC_META, "meta"); + if (rc == -EINPROGRESS || rc == -EAGAIN) { + wait_queue_head_t waitq; + struct l_wait_info lwi; + + /* MDT0 is not ready, let's wait for 2 + * seconds and retry. */ + init_waitqueue_head(&waitq); + lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, + NULL); + l_wait_event(waitq, 0, &lwi); + } + } while (rc == -EINPROGRESS || rc == -EAGAIN); + } + + RETURN(rc); } /* Allocate new sequence for client. */ static int seq_client_alloc_seq(const struct lu_env *env, - struct lu_client_seq *seq, seqno_t *seqnr) + struct lu_client_seq *seq, u64 *seqnr) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(range_is_sane(&seq->lcs_space)); + LASSERT(lu_seq_range_is_sane(&seq->lcs_space)); - if (range_is_exhausted(&seq->lcs_space)) { + if (lu_seq_range_is_exhausted(&seq->lcs_space)) { rc = seq_client_alloc_meta(env, seq); if (rc) { - CERROR("%s: Can't allocate new meta-sequence," - "rc %d\n", seq->lcs_name, rc); + if (rc != -EINPROGRESS) + CERROR("%s: Can't allocate new meta-sequence," + "rc = %d\n", seq->lcs_name, rc); RETURN(rc); } else { CDEBUG(D_INFO, "%s: New range - "DRANGE"\n", @@ -211,153 +237,169 @@ static int seq_client_alloc_seq(const struct lu_env *env, rc = 0; } - LASSERT(!range_is_exhausted(&seq->lcs_space)); - *seqnr = seq->lcs_space.lsr_start; - seq->lcs_space.lsr_start += 1; + LASSERT(!lu_seq_range_is_exhausted(&seq->lcs_space)); + *seqnr = seq->lcs_space.lsr_start; + seq->lcs_space.lsr_start += 1; - CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, + CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name, *seqnr); RETURN(rc); } static int seq_fid_alloc_prep(struct lu_client_seq *seq, - cfs_waitlink_t *link) + wait_queue_entry_t *link) { - if (seq->lcs_update) { - cfs_waitq_add(&seq->lcs_waitq, link); - cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + if (seq->lcs_update) { + add_wait_queue(&seq->lcs_waitq, link); + set_current_state(TASK_UNINTERRUPTIBLE); + mutex_unlock(&seq->lcs_mutex); - cfs_waitq_wait(link, CFS_TASK_UNINT); + schedule(); - cfs_mutex_lock(&seq->lcs_mutex); - cfs_waitq_del(&seq->lcs_waitq, link); - cfs_set_current_state(CFS_TASK_RUNNING); - return -EAGAIN; - } - ++seq->lcs_update; - cfs_mutex_unlock(&seq->lcs_mutex); - return 0; + mutex_lock(&seq->lcs_mutex); + remove_wait_queue(&seq->lcs_waitq, link); + set_current_state(TASK_RUNNING); + return -EAGAIN; + } + + ++seq->lcs_update; + mutex_unlock(&seq->lcs_mutex); + + return 0; } -static void seq_fid_alloc_fini(struct lu_client_seq *seq) +static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr, + bool whole) { - LASSERT(seq->lcs_update == 1); - cfs_mutex_lock(&seq->lcs_mutex); - --seq->lcs_update; - cfs_waitq_signal(&seq->lcs_waitq); + LASSERT(seq->lcs_update == 1); + + mutex_lock(&seq->lcs_mutex); + if (seqnr != 0) { + CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n", + seq->lcs_name, seqnr); + + seq->lcs_fid.f_seq = seqnr; + if (whole) { + /* Since the caller require the whole seq, + * so marked this seq to be used */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = + LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + } else { + seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; + } + seq->lcs_fid.f_ver = 0; + } + + --seq->lcs_update; + wake_up_all(&seq->lcs_waitq); } -/* Allocate the whole seq to the caller*/ +/** + * Allocate the whole non-used seq to the caller. + * + * \param[in] env pointer to the thread context + * \param[in,out] seq pointer to the client sequence manager + * \param[out] seqnr to hold the new allocated sequence + * + * \retval 0 for new sequence allocated. + * \retval Negative error number on failure. + */ int seq_client_get_seq(const struct lu_env *env, - struct lu_client_seq *seq, seqno_t *seqnr) + struct lu_client_seq *seq, u64 *seqnr) { - cfs_waitlink_t link; - int rc; - - LASSERT(seqnr != NULL); - cfs_mutex_lock(&seq->lcs_mutex); - cfs_waitlink_init(&link); + wait_queue_entry_t link; + int rc; - while (1) { - rc = seq_fid_alloc_prep(seq, &link); - if (rc == 0) - break; - } - - rc = seq_client_alloc_seq(env, seq, seqnr); - if (rc) { - CERROR("%s: Can't allocate new sequence, " - "rc %d\n", seq->lcs_name, rc); - seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); - return rc; - } + LASSERT(seqnr != NULL); - CDEBUG(D_INFO, "%s: allocate sequence " - "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); + mutex_lock(&seq->lcs_mutex); + init_waitqueue_entry(&link, current); - /*Since the caller require the whole seq, - *so marked this seq to be used*/ - seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH; - seq->lcs_fid.f_seq = *seqnr; - seq->lcs_fid.f_ver = 0; + /* To guarantee that we can get a whole non-used sequence. */ + while (seq_fid_alloc_prep(seq, &link) != 0); - /* - * Inform caller that sequence switch is performed to allow it - * to setup FLD for it. - */ - seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + rc = seq_client_alloc_seq(env, seq, seqnr); + seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true); + if (rc) + CERROR("%s: Can't allocate new sequence: rc = %d\n", + seq->lcs_name, rc); + mutex_unlock(&seq->lcs_mutex); - return rc; + return rc; } EXPORT_SYMBOL(seq_client_get_seq); -/* Allocate new fid on passed client @seq and save it to @fid. */ +/** + * Allocate new fid on passed client @seq and save it to @fid. + * + * \param[in] env pointer to the thread context + * \param[in,out] seq pointer to the client sequence manager + * \param[out] fid to hold the new allocated fid + * + * \retval 1 for notify the caller that sequence switch + * is performed to allow it to setup FLD for it. + * \retval 0 for new FID allocated in current sequence. + * \retval Negative error number on failure. + */ int seq_client_alloc_fid(const struct lu_env *env, - struct lu_client_seq *seq, struct lu_fid *fid) + struct lu_client_seq *seq, struct lu_fid *fid) { - cfs_waitlink_t link; - int rc; - ENTRY; + wait_queue_entry_t link; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(fid != NULL); + LASSERT(seq != NULL); + LASSERT(fid != NULL); - cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + init_waitqueue_entry(&link, current); + mutex_lock(&seq->lcs_mutex); if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) seq->lcs_fid.f_oid = seq->lcs_width; - while (1) { - seqno_t seqnr; - - if (!fid_is_zero(&seq->lcs_fid) && - fid_oid(&seq->lcs_fid) < seq->lcs_width) { - /* Just bump last allocated fid and return to caller. */ - seq->lcs_fid.f_oid += 1; - rc = 0; - break; - } - - rc = seq_fid_alloc_prep(seq, &link); - if (rc) - continue; - - rc = seq_client_alloc_seq(env, seq, &seqnr); - if (rc) { - CERROR("%s: Can't allocate new sequence, " - "rc %d\n", seq->lcs_name, rc); - seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); - RETURN(rc); - } - - CDEBUG(D_INFO, "%s: Switch to sequence " - "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr); - - seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; - seq->lcs_fid.f_seq = seqnr; - seq->lcs_fid.f_ver = 0; - - /* - * Inform caller that sequence switch is performed to allow it - * to setup FLD for it. - */ - rc = 1; + while (1) { + u64 seqnr; + + if (unlikely(!fid_is_zero(&seq->lcs_fid) && + fid_oid(&seq->lcs_fid) < seq->lcs_width)) { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid++; + rc = 0; + break; + } + + /* Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid + * deadlock during seq_client_alloc_seq(). */ + rc = seq_fid_alloc_prep(seq, &link); + if (rc) + continue; + + rc = seq_client_alloc_seq(env, seq, &seqnr); + /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */ + seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false); + if (rc) { + if (rc != -EINPROGRESS) + CERROR("%s: Can't allocate new sequence: " + "rc = %d\n", seq->lcs_name, rc); + mutex_unlock(&seq->lcs_mutex); + + RETURN(rc); + } + + rc = 1; + break; + } - seq_fid_alloc_fini(seq); - break; - } + *fid = seq->lcs_fid; + mutex_unlock(&seq->lcs_mutex); - *fid = seq->lcs_fid; - cfs_mutex_unlock(&seq->lcs_mutex); + CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); - CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(seq_client_alloc_fid); @@ -367,23 +409,23 @@ EXPORT_SYMBOL(seq_client_alloc_fid); */ void seq_client_flush(struct lu_client_seq *seq) { - cfs_waitlink_t link; + wait_queue_entry_t link; - LASSERT(seq != NULL); - cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + LASSERT(seq != NULL); + init_waitqueue_entry(&link, current); + mutex_lock(&seq->lcs_mutex); - while (seq->lcs_update) { - cfs_waitq_add(&seq->lcs_waitq, &link); - cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + while (seq->lcs_update) { + add_wait_queue(&seq->lcs_waitq, &link); + set_current_state(TASK_UNINTERRUPTIBLE); + mutex_unlock(&seq->lcs_mutex); - cfs_waitq_wait(&link, CFS_TASK_UNINT); + schedule(); - cfs_mutex_lock(&seq->lcs_mutex); - cfs_waitq_del(&seq->lcs_waitq, &link); - cfs_set_current_state(CFS_TASK_RUNNING); - } + mutex_lock(&seq->lcs_mutex); + remove_wait_queue(&seq->lcs_waitq, &link); + set_current_state(TASK_RUNNING); + } fid_zero(&seq->lcs_fid); /** @@ -393,66 +435,62 @@ void seq_client_flush(struct lu_client_seq *seq) seq->lcs_space.lsr_index = -1; - range_init(&seq->lcs_space); - cfs_mutex_unlock(&seq->lcs_mutex); + lu_seq_range_init(&seq->lcs_space); + mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); -static void seq_client_proc_fini(struct lu_client_seq *seq); +static void seq_client_debugfs_fini(struct lu_client_seq *seq) +{ + if (!IS_ERR_OR_NULL(seq->lcs_debugfs_entry)) + ldebugfs_remove(&seq->lcs_debugfs_entry); +} -#ifdef LPROCFS -static int seq_client_proc_init(struct lu_client_seq *seq) +static int seq_client_debugfs_init(struct lu_client_seq *seq) { int rc; - ENTRY; - seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, - seq_type_proc_dir, - NULL, NULL); - - if (IS_ERR(seq->lcs_proc_dir)) { - CERROR("%s: LProcFS failed in seq-init\n", - seq->lcs_name); - rc = PTR_ERR(seq->lcs_proc_dir); - RETURN(rc); + seq->lcs_debugfs_entry = ldebugfs_register(seq->lcs_name, + seq_debugfs_dir, + NULL, NULL); + if (IS_ERR_OR_NULL(seq->lcs_debugfs_entry)) { + CERROR("%s: LdebugFS failed in seq-init\n", seq->lcs_name); + rc = seq->lcs_debugfs_entry ? PTR_ERR(seq->lcs_debugfs_entry) + : -ENOMEM; + seq->lcs_debugfs_entry = NULL; + RETURN(rc); } - rc = lprocfs_add_vars(seq->lcs_proc_dir, - seq_client_proc_list, seq); - if (rc) { - CERROR("%s: Can't init sequence manager " - "proc, rc %d\n", seq->lcs_name, rc); + rc = ldebugfs_add_vars(seq->lcs_debugfs_entry, + seq_client_debugfs_list, seq); + if (rc) { + CERROR("%s: Can't init sequence manager debugfs, rc %d\n", + seq->lcs_name, rc); GOTO(out_cleanup, rc); } RETURN(0); out_cleanup: - seq_client_proc_fini(seq); + seq_client_debugfs_fini(seq); return rc; } -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - ENTRY; - if (seq->lcs_proc_dir) { - if (!IS_ERR(seq->lcs_proc_dir)) - lprocfs_remove(&seq->lcs_proc_dir); - seq->lcs_proc_dir = NULL; - } - EXIT; -} -#else -static int seq_client_proc_init(struct lu_client_seq *seq) +void seq_client_fini(struct lu_client_seq *seq) { - return 0; -} + ENTRY; -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - return; + seq_client_debugfs_fini(seq); + + if (seq->lcs_exp != NULL) { + class_export_put(seq->lcs_exp); + seq->lcs_exp = NULL; + } + + seq->lcs_srv = NULL; + EXIT; } -#endif +EXPORT_SYMBOL(seq_client_fini); int seq_client_init(struct lu_client_seq *seq, struct obd_export *exp, @@ -460,51 +498,118 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; - cfs_mutex_init(&seq->lcs_mutex); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; - cfs_waitq_init(&seq->lcs_waitq); + seq->lcs_srv = srv; + seq->lcs_type = type; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + mutex_init(&seq->lcs_mutex); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + init_waitqueue_head(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_debugfs_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init); -void seq_client_fini(struct lu_client_seq *seq) +int client_fid_init(struct obd_device *obd, + struct obd_export *exp, enum lu_cli_type type) { - ENTRY; + struct client_obd *cli = &obd->u.cli; + char *prefix; + int rc; + ENTRY; - seq_client_proc_fini(seq); + down_write(&cli->cl_seq_rwsem); + OBD_ALLOC_PTR(cli->cl_seq); + if (!cli->cl_seq) + GOTO(out, rc = -ENOMEM); - if (seq->lcs_exp != NULL) { - class_export_put(seq->lcs_exp); - seq->lcs_exp = NULL; - } + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (!prefix) + GOTO(out, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + + GOTO(out, rc); - seq->lcs_srv = NULL; - EXIT; +out: + if (rc && cli->cl_seq) { + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + up_write(&cli->cl_seq_rwsem); + + return rc; } -EXPORT_SYMBOL(seq_client_fini); +EXPORT_SYMBOL(client_fid_init); + +int client_fid_fini(struct obd_device *obd) +{ + struct client_obd *cli = &obd->u.cli; + ENTRY; + + down_write(&cli->cl_seq_rwsem); + if (cli->cl_seq) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + up_write(&cli->cl_seq_rwsem); + + RETURN(0); +} +EXPORT_SYMBOL(client_fid_fini); + +static int __init fid_init(void) +{ +#ifdef HAVE_SERVER_SUPPORT + int rc = fid_server_mod_init(); + + if (rc) + return rc; +#endif + seq_debugfs_dir = ldebugfs_register(LUSTRE_SEQ_NAME, + debugfs_lustre_root, + NULL, NULL); + return PTR_ERR_OR_ZERO(seq_debugfs_dir); +} + +static void __exit fid_exit(void) +{ +# ifdef HAVE_SERVER_SUPPORT + fid_server_mod_exit(); +# endif + if (!IS_ERR_OR_NULL(seq_debugfs_dir)) + ldebugfs_remove(&seq_debugfs_dir); +} + +MODULE_AUTHOR("OpenSFS, Inc. "); +MODULE_DESCRIPTION("Lustre File IDentifier"); +MODULE_VERSION(LUSTRE_VERSION_STRING); +MODULE_LICENSE("GPL"); + +module_init(fid_init); +module_exit(fid_exit);