X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=d0da85e5902557ad63f5479577abfc1f4ed33fa0;hb=c70512dfa35540a797b942c44c4f6e2b0daaba1c;hp=6a177b4e16a8021c799e6545cfcef051f6a37a96;hpb=684d830c4a6888850ea62ba3feef87dc191f9f5f;p=fs%2Flustre-release.git diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 6a177b4..d0da85e 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -51,10 +51,7 @@ #include #include -#include -#include #include -#include #include /* mdc RPC locks */ #include @@ -87,34 +84,39 @@ static int seq_client_rpc(struct lu_client_seq *seq, ptlrpc_request_set_replen(req); - if (seq->lcs_type == LUSTRE_SEQ_METADATA) { - req->rq_request_portal = SEQ_METADATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_MDT; - } else { - LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA, - "unknown lcs_type %u\n", seq->lcs_type); - req->rq_request_portal = SEQ_DATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_OST; - } + in->lsr_index = seq->lcs_space.lsr_index; + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + fld_range_set_mdt(in); + else + fld_range_set_ost(in); if (opc == SEQ_ALLOC_SUPER) { - /* Update index field of *in, it is required for - * FLD update on super sequence allocator node. */ - in->lsr_index = seq->lcs_space.lsr_index; req->rq_request_portal = SEQ_CONTROLLER_PORTAL; + req->rq_reply_portal = MDC_REPLY_PORTAL; + /* During allocating super sequence for data object, + * the current thread might hold the export of MDT0(MDT0 + * precreating objects on this OST), and it will send the + * request to MDT0 here, so we can not keep resending the + * request here, otherwise if MDT0 is failed(umounted), + * it can not release the export of MDT0 */ + if (seq->lcs_type == LUSTRE_SEQ_DATA) + req->rq_no_delay = req->rq_no_resend = 1; debug_mask = D_CONSOLE; } else { + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + req->rq_request_portal = SEQ_METADATA_PORTAL; + else + req->rq_request_portal = SEQ_DATA_PORTAL; debug_mask = D_INFO; - LASSERTF(opc == SEQ_ALLOC_META, - "unknown opcode %u\n, opc", opc); } ptlrpc_at_set_req_timeout(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc) GOTO(out_req, rc); @@ -149,21 +151,28 @@ int seq_client_alloc_super(struct lu_client_seq *seq, int rc; ENTRY; - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); -#ifdef __KERNEL__ if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER LASSERT(env != NULL); rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space, env); - } else { +#else + rc = 0; #endif - rc = seq_client_rpc(seq, &seq->lcs_space, + } else { + /* Check whether the connection to seq controller has been + * setup (lcs_exp != NULL) */ + if (seq->lcs_exp == NULL) { + mutex_unlock(&seq->lcs_mutex); + RETURN(-EINPROGRESS); + } + + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_SUPER, "super"); -#ifdef __KERNEL__ } -#endif - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -174,17 +183,24 @@ static int seq_client_alloc_meta(const struct lu_env *env, int rc; ENTRY; -#ifdef __KERNEL__ if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER LASSERT(env != NULL); rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); - } else { +#else + rc = 0; #endif - rc = seq_client_rpc(seq, &seq->lcs_space, - SEQ_ALLOC_META, "meta"); -#ifdef __KERNEL__ + } else { + do { + /* If meta server return -EINPROGRESS or EAGAIN, + * it means meta server might not be ready to + * allocate super sequence from sequence controller + * (MDT0)yet */ + rc = seq_client_rpc(seq, &seq->lcs_space, + SEQ_ALLOC_META, "meta"); + } while (rc == -EINPROGRESS || rc == -EAGAIN); } -#endif + RETURN(rc); } @@ -227,29 +243,31 @@ static int seq_fid_alloc_prep(struct lu_client_seq *seq, if (seq->lcs_update) { cfs_waitq_add(&seq->lcs_waitq, link); cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); cfs_waitq_wait(link, CFS_TASK_UNINT); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitq_del(&seq->lcs_waitq, link); cfs_set_current_state(CFS_TASK_RUNNING); return -EAGAIN; } ++seq->lcs_update; - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return 0; } static void seq_fid_alloc_fini(struct lu_client_seq *seq) { LASSERT(seq->lcs_update == 1); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); --seq->lcs_update; cfs_waitq_signal(&seq->lcs_waitq); } -/* Allocate the whole seq to the caller*/ +/** + * Allocate the whole seq to the caller. + **/ int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq, seqno_t *seqnr) { @@ -257,7 +275,7 @@ int seq_client_get_seq(const struct lu_env *env, int rc; LASSERT(seqnr != NULL); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitlink_init(&link); while (1) { @@ -271,25 +289,28 @@ int seq_client_get_seq(const struct lu_env *env, CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return rc; } CDEBUG(D_INFO, "%s: allocate sequence " "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); - /*Since the caller require the whole seq, - *so marked this seq to be used*/ - seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH; - seq->lcs_fid.f_seq = *seqnr; - seq->lcs_fid.f_ver = 0; + /* Since the caller require the whole seq, + * so marked this seq to be used */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; /* * Inform caller that sequence switch is performed to allow it * to setup FLD for it. */ seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return rc; } @@ -307,7 +328,10 @@ int seq_client_alloc_fid(const struct lu_env *env, LASSERT(fid != NULL); cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); + + if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) + seq->lcs_fid.f_oid = seq->lcs_width; while (1) { seqno_t seqnr; @@ -329,7 +353,7 @@ int seq_client_alloc_fid(const struct lu_env *env, CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -351,7 +375,7 @@ int seq_client_alloc_fid(const struct lu_env *env, } *fid = seq->lcs_fid; - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); RETURN(rc); @@ -368,16 +392,16 @@ void seq_client_flush(struct lu_client_seq *seq) LASSERT(seq != NULL); cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); while (seq->lcs_update) { cfs_waitq_add(&seq->lcs_waitq, &link); cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); cfs_waitq_wait(&link, CFS_TASK_UNINT); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitq_del(&seq->lcs_waitq, &link); cfs_set_current_state(CFS_TASK_RUNNING); } @@ -391,15 +415,26 @@ void seq_client_flush(struct lu_client_seq *seq) seq->lcs_space.lsr_index = -1; range_init(&seq->lcs_space); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); -static void seq_client_proc_fini(struct lu_client_seq *seq); - +static void seq_client_proc_fini(struct lu_client_seq *seq) +{ #ifdef LPROCFS + ENTRY; + if (seq->lcs_proc_dir) { + if (!IS_ERR(seq->lcs_proc_dir)) + lprocfs_remove(&seq->lcs_proc_dir); + seq->lcs_proc_dir = NULL; + } + EXIT; +#endif /* LPROCFS */ +} + static int seq_client_proc_init(struct lu_client_seq *seq) { +#ifdef LPROCFS int rc; ENTRY; @@ -427,29 +462,11 @@ static int seq_client_proc_init(struct lu_client_seq *seq) out_cleanup: seq_client_proc_fini(seq); return rc; -} -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - ENTRY; - if (seq->lcs_proc_dir) { - if (!IS_ERR(seq->lcs_proc_dir)) - lprocfs_remove(&seq->lcs_proc_dir); - seq->lcs_proc_dir = NULL; - } - EXIT; -} -#else -static int seq_client_proc_init(struct lu_client_seq *seq) -{ - return 0; -} - -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - return; -} +#else /* LPROCFS */ + return 0; #endif +} int seq_client_init(struct lu_client_seq *seq, struct obd_export *exp, @@ -457,36 +474,37 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; - cfs_mutex_init(&seq->lcs_mutex); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; - cfs_waitq_init(&seq->lcs_waitq); + seq->lcs_srv = srv; + seq->lcs_type = type; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + mutex_init(&seq->lcs_mutex); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + cfs_waitq_init(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); + else if (type == LUSTRE_SEQ_METADATA) + LASSERT(seq->lcs_srv != NULL); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init); @@ -505,3 +523,87 @@ void seq_client_fini(struct lu_client_seq *seq) EXIT; } EXPORT_SYMBOL(seq_client_fini); + +int client_fid_init(struct obd_device *obd, + struct obd_export *exp, enum lu_cli_type type) +{ + struct client_obd *cli = &obd->u.cli; + char *prefix; + int rc; + ENTRY; + + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) + GOTO(out_free_seq, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc) + GOTO(out_free_seq, rc); + + RETURN(rc); +out_free_seq: + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + return rc; +} +EXPORT_SYMBOL(client_fid_init); + +int client_fid_fini(struct obd_device *obd) +{ + struct client_obd *cli = &obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} +EXPORT_SYMBOL(client_fid_fini); + +#ifdef __KERNEL__ +struct proc_dir_entry *seq_type_proc_dir; + +static int __init fid_mod_init(void) +{ + seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, + proc_lustre_root, + NULL, NULL); + if (IS_ERR(seq_type_proc_dir)) + return PTR_ERR(seq_type_proc_dir); + +# ifdef HAVE_SERVER_SUPPORT + fid_server_mod_init(); +# endif + + return 0; +} + +static void __exit fid_mod_exit(void) +{ +# ifdef HAVE_SERVER_SUPPORT + fid_server_mod_exit(); +# endif + + if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) { + lprocfs_remove(&seq_type_proc_dir); + seq_type_proc_dir = NULL; + } +} + +MODULE_AUTHOR("Sun Microsystems, Inc. "); +MODULE_DESCRIPTION("Lustre FID Module"); +MODULE_LICENSE("GPL"); + +cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit); +#endif /* __KERNEL__ */