X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=3cd7a485cec245cc8f8894e646d562c76be7d0e3;hp=426635887d254789949ed83b0e8999d156dbe7dd;hb=1ecf5bd5a3bece3d615d037a332792e360b49a09;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 4266358..3cd7a48 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -40,167 +40,183 @@ * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_FID -#ifdef __KERNEL__ -# include -# include -#else /* __KERNEL__ */ -# include -#endif - +#include +#include #include #include -#include -#include #include -#include #include /* mdc RPC locks */ #include #include "fid_internal.h" -static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input, - struct lu_range *output, __u32 opc, +static int seq_client_rpc(struct lu_client_seq *seq, + struct lu_seq_range *output, __u32 opc, const char *opcname) { - struct obd_export *exp = seq->lcs_exp; - struct ptlrpc_request *req; - struct lu_range *out, *in; - __u32 *op; - int rc; - ENTRY; - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, - LUSTRE_MDS_VERSION, SEQ_QUERY); - if (req == NULL) - RETURN(-ENOMEM); - - /* Init operation code */ - op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); - *op = opc; - - /* Zero out input range, this is not recovery yet. */ - in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); - if (input != NULL) - *in = *input; - else - range_zero(in); - - ptlrpc_request_set_replen(req); - - if (seq->lcs_type == LUSTRE_SEQ_METADATA) { - req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? - SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL; - } else { - req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? - SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL; - } - ptlrpc_at_set_req_timeout(req); - - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - - if (rc) - GOTO(out_req, rc); - - out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); - *output = *out; - - if (!range_is_sane(output)) { - CERROR("%s: Invalid range received from server: " - DRANGE"\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - - if (range_is_exhausted(output)) { - CERROR("%s: Range received from server is exhausted: " - DRANGE"]\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - *in = *out; - - CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n", - seq->lcs_name, opcname, PRANGE(output)); - - EXIT; + struct obd_export *exp = seq->lcs_exp; + struct ptlrpc_request *req; + struct lu_seq_range *out, *in; + __u32 *op; + unsigned int debug_mask; + int rc; + ENTRY; + + LASSERT(exp != NULL && !IS_ERR(exp)); + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, + LUSTRE_MDS_VERSION, SEQ_QUERY); + if (req == NULL) + RETURN(-ENOMEM); + + /* Init operation code */ + op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); + *op = opc; + + /* Zero out input range, this is not recovery yet. */ + in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); + lu_seq_range_init(in); + + ptlrpc_request_set_replen(req); + + in->lsr_index = seq->lcs_space.lsr_index; + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + fld_range_set_mdt(in); + else + fld_range_set_ost(in); + + if (opc == SEQ_ALLOC_SUPER) { + req->rq_request_portal = SEQ_CONTROLLER_PORTAL; + req->rq_reply_portal = MDC_REPLY_PORTAL; + /* During allocating super sequence for data object, + * the current thread might hold the export of MDT0(MDT0 + * precreating objects on this OST), and it will send the + * request to MDT0 here, so we can not keep resending the + * request here, otherwise if MDT0 is failed(umounted), + * it can not release the export of MDT0 */ + if (seq->lcs_type == LUSTRE_SEQ_DATA) + req->rq_no_delay = req->rq_no_resend = 1; + debug_mask = D_CONSOLE; + } else { + if (seq->lcs_type == LUSTRE_SEQ_METADATA) { + req->rq_reply_portal = MDC_REPLY_PORTAL; + req->rq_request_portal = SEQ_METADATA_PORTAL; + } else { + req->rq_reply_portal = OSC_REPLY_PORTAL; + req->rq_request_portal = SEQ_DATA_PORTAL; + } + + debug_mask = D_INFO; + } + + /* Allow seq client RPC during recovery time. */ + req->rq_allow_replay = 1; + + ptlrpc_at_set_req_timeout(req); + + rc = ptlrpc_queue_wait(req); + + if (rc) + GOTO(out_req, rc); + + out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); + *output = *out; + + if (!lu_seq_range_is_sane(output)) { + CERROR("%s: Invalid range received from server: " + DRANGE"\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + if (lu_seq_range_is_exhausted(output)) { + CERROR("%s: Range received from server is exhausted: " + DRANGE"]\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", + seq->lcs_name, opcname, PRANGE(output)); + + EXIT; out_req: - ptlrpc_req_finished(req); - return rc; + ptlrpc_req_finished(req); + return rc; } /* Request sequence-controller node to allocate new super-sequence. */ -int seq_client_replay_super(struct lu_client_seq *seq, - struct lu_range *range, - const struct lu_env *env) +int seq_client_alloc_super(struct lu_client_seq *seq, + const struct lu_env *env) { int rc; ENTRY; - down(&seq->lcs_sem); + mutex_lock(&seq->lcs_mutex); -#ifdef __KERNEL__ if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER LASSERT(env != NULL); - rc = seq_server_alloc_super(seq->lcs_srv, range, - &seq->lcs_space, env); - } else { + rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space, + env); +#else + rc = 0; #endif - rc = seq_client_rpc(seq, range, &seq->lcs_space, + } else { + /* Check whether the connection to seq controller has been + * setup (lcs_exp != NULL) */ + if (seq->lcs_exp == NULL) { + mutex_unlock(&seq->lcs_mutex); + RETURN(-EINPROGRESS); + } + + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_SUPER, "super"); -#ifdef __KERNEL__ } -#endif - up(&seq->lcs_sem); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } -/* Request sequence-controller node to allocate new super-sequence. */ -int seq_client_alloc_super(struct lu_client_seq *seq, - const struct lu_env *env) -{ - ENTRY; - RETURN(seq_client_replay_super(seq, NULL, env)); -} - /* Request sequence-controller node to allocate new meta-sequence. */ -static int seq_client_alloc_meta(struct lu_client_seq *seq, - const struct lu_env *env) +static int seq_client_alloc_meta(const struct lu_env *env, + struct lu_client_seq *seq) { int rc; ENTRY; -#ifdef __KERNEL__ if (seq->lcs_srv) { +#ifdef HAVE_SEQ_SERVER LASSERT(env != NULL); - rc = seq_server_alloc_meta(seq->lcs_srv, NULL, - &seq->lcs_space, env); - } else { + rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); +#else + rc = 0; #endif - rc = seq_client_rpc(seq, NULL, &seq->lcs_space, - SEQ_ALLOC_META, "meta"); -#ifdef __KERNEL__ + } else { + do { + /* If meta server return -EINPROGRESS or EAGAIN, + * it means meta server might not be ready to + * allocate super sequence from sequence controller + * (MDT0)yet */ + rc = seq_client_rpc(seq, &seq->lcs_space, + SEQ_ALLOC_META, "meta"); + } while (rc == -EINPROGRESS || rc == -EAGAIN); } -#endif + RETURN(rc); } /* Allocate new sequence for client. */ -static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) +static int seq_client_alloc_seq(const struct lu_env *env, + struct lu_client_seq *seq, u64 *seqnr) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(range_is_sane(&seq->lcs_space)); + LASSERT(lu_seq_range_is_sane(&seq->lcs_space)); - if (range_is_exhausted(&seq->lcs_space)) { - rc = seq_client_alloc_meta(seq, NULL); + if (lu_seq_range_is_exhausted(&seq->lcs_space)) { + rc = seq_client_alloc_meta(env, seq); if (rc) { - CERROR("%s: Can't allocate new meta-sequence, " + CERROR("%s: Can't allocate new meta-sequence," "rc %d\n", seq->lcs_name, rc); RETURN(rc); } else { @@ -211,9 +227,9 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) rc = 0; } - LASSERT(!range_is_exhausted(&seq->lcs_space)); - *seqnr = seq->lcs_space.lr_start; - seq->lcs_space.lr_start += 1; + LASSERT(!lu_seq_range_is_exhausted(&seq->lcs_space)); + *seqnr = seq->lcs_space.lsr_start; + seq->lcs_space.lsr_start += 1; CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, *seqnr); @@ -221,27 +237,123 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) RETURN(rc); } +static int seq_fid_alloc_prep(struct lu_client_seq *seq, + wait_queue_t *link) +{ + if (seq->lcs_update) { + add_wait_queue(&seq->lcs_waitq, link); + set_current_state(TASK_UNINTERRUPTIBLE); + mutex_unlock(&seq->lcs_mutex); + + schedule(); + + mutex_lock(&seq->lcs_mutex); + remove_wait_queue(&seq->lcs_waitq, link); + set_current_state(TASK_RUNNING); + return -EAGAIN; + } + ++seq->lcs_update; + mutex_unlock(&seq->lcs_mutex); + return 0; +} + +static void seq_fid_alloc_fini(struct lu_client_seq *seq) +{ + LASSERT(seq->lcs_update == 1); + mutex_lock(&seq->lcs_mutex); + --seq->lcs_update; + wake_up(&seq->lcs_waitq); +} + +/** + * Allocate the whole seq to the caller. + **/ +int seq_client_get_seq(const struct lu_env *env, + struct lu_client_seq *seq, u64 *seqnr) +{ + wait_queue_t link; + int rc; + + LASSERT(seqnr != NULL); + mutex_lock(&seq->lcs_mutex); + init_waitqueue_entry(&link, current); + + while (1) { + rc = seq_fid_alloc_prep(seq, &link); + if (rc == 0) + break; + } + + rc = seq_client_alloc_seq(env, seq, seqnr); + if (rc) { + CERROR("%s: Can't allocate new sequence, " + "rc %d\n", seq->lcs_name, rc); + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); + return rc; + } + + CDEBUG(D_INFO, "%s: allocate sequence " + "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); + + /* Since the caller require the whole seq, + * so marked this seq to be used */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; + /* + * Inform caller that sequence switch is performed to allow it + * to setup FLD for it. + */ + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); + + return rc; +} +EXPORT_SYMBOL(seq_client_get_seq); + /* Allocate new fid on passed client @seq and save it to @fid. */ -int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) +int seq_client_alloc_fid(const struct lu_env *env, + struct lu_client_seq *seq, struct lu_fid *fid) { - int rc; - ENTRY; + wait_queue_t link; + int rc; + ENTRY; + + LASSERT(seq != NULL); + LASSERT(fid != NULL); + + init_waitqueue_entry(&link, current); + mutex_lock(&seq->lcs_mutex); - LASSERT(seq != NULL); - LASSERT(fid != NULL); + if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) + seq->lcs_fid.f_oid = seq->lcs_width; - down(&seq->lcs_sem); + while (1) { + u64 seqnr; - if (fid_is_zero(&seq->lcs_fid) || - fid_oid(&seq->lcs_fid) >= seq->lcs_width) - { - seqno_t seqnr; + if (!fid_is_zero(&seq->lcs_fid) && + fid_oid(&seq->lcs_fid) < seq->lcs_width) { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid += 1; + rc = 0; + break; + } + + rc = seq_fid_alloc_prep(seq, &link); + if (rc) + continue; - rc = seq_client_alloc_seq(seq, &seqnr); + rc = seq_client_alloc_seq(env, seq, &seqnr); if (rc) { CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); - up(&seq->lcs_sem); + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -257,14 +369,13 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) * to setup FLD for it. */ rc = 1; - } else { - /* Just bump last allocated fid and return to caller. */ - seq->lcs_fid.f_oid += 1; - rc = 0; + + seq_fid_alloc_fini(seq); + break; } *fid = seq->lcs_fid; - up(&seq->lcs_sem); + mutex_unlock(&seq->lcs_mutex); CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); RETURN(rc); @@ -277,26 +388,58 @@ EXPORT_SYMBOL(seq_client_alloc_fid); */ void seq_client_flush(struct lu_client_seq *seq) { - LASSERT(seq != NULL); - down(&seq->lcs_sem); + wait_queue_t link; + + LASSERT(seq != NULL); + init_waitqueue_entry(&link, current); + mutex_lock(&seq->lcs_mutex); + + while (seq->lcs_update) { + add_wait_queue(&seq->lcs_waitq, &link); + set_current_state(TASK_UNINTERRUPTIBLE); + mutex_unlock(&seq->lcs_mutex); + + schedule(); + + mutex_lock(&seq->lcs_mutex); + remove_wait_queue(&seq->lcs_waitq, &link); + set_current_state(TASK_RUNNING); + } + fid_zero(&seq->lcs_fid); - range_zero(&seq->lcs_space); - up(&seq->lcs_sem); + /** + * this id shld not be used for seq range allocation. + * set to -1 for dgb check. + */ + + seq->lcs_space.lsr_index = -1; + + lu_seq_range_init(&seq->lcs_space); + mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); -static void seq_client_proc_fini(struct lu_client_seq *seq); +static void seq_client_proc_fini(struct lu_client_seq *seq) +{ +#ifdef CONFIG_PROC_FS + ENTRY; + if (seq->lcs_proc_dir) { + if (!IS_ERR(seq->lcs_proc_dir)) + lprocfs_remove(&seq->lcs_proc_dir); + seq->lcs_proc_dir = NULL; + } + EXIT; +#endif /* CONFIG_PROC_FS */ +} -#ifdef LPROCFS static int seq_client_proc_init(struct lu_client_seq *seq) { +#ifdef CONFIG_PROC_FS int rc; ENTRY; - seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, - seq_type_proc_dir, - NULL, NULL); - + seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, seq_type_proc_dir, + NULL, NULL); if (IS_ERR(seq->lcs_proc_dir)) { CERROR("%s: LProcFS failed in seq-init\n", seq->lcs_name); @@ -304,8 +447,7 @@ static int seq_client_proc_init(struct lu_client_seq *seq) RETURN(rc); } - rc = lprocfs_add_vars(seq->lcs_proc_dir, - seq_client_proc_list, seq); + rc = lprocfs_add_vars(seq->lcs_proc_dir, seq_client_proc_list, seq); if (rc) { CERROR("%s: Can't init sequence manager " "proc, rc %d\n", seq->lcs_name, rc); @@ -317,29 +459,11 @@ static int seq_client_proc_init(struct lu_client_seq *seq) out_cleanup: seq_client_proc_fini(seq); return rc; -} - -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - ENTRY; - if (seq->lcs_proc_dir) { - if (!IS_ERR(seq->lcs_proc_dir)) - lprocfs_remove(&seq->lcs_proc_dir); - seq->lcs_proc_dir = NULL; - } - EXIT; -} -#else -static int seq_client_proc_init(struct lu_client_seq *seq) -{ - return 0; -} -static void seq_client_proc_fini(struct lu_client_seq *seq) -{ - return; +#else /* !CONFIG_PROC_FS */ + return 0; +#endif /* CONFIG_PROC_FS */ } -#endif int seq_client_init(struct lu_client_seq *seq, struct obd_export *exp, @@ -347,35 +471,35 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; - sema_init(&seq->lcs_sem, 1); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; + seq->lcs_srv = srv; + seq->lcs_type = type; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + mutex_init(&seq->lcs_mutex); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + init_waitqueue_head(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init); @@ -394,3 +518,87 @@ void seq_client_fini(struct lu_client_seq *seq) EXIT; } EXPORT_SYMBOL(seq_client_fini); + +int client_fid_init(struct obd_device *obd, + struct obd_export *exp, enum lu_cli_type type) +{ + struct client_obd *cli = &obd->u.cli; + char *prefix; + int rc; + ENTRY; + + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) + GOTO(out_free_seq, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc) + GOTO(out_free_seq, rc); + + RETURN(rc); +out_free_seq: + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + return rc; +} +EXPORT_SYMBOL(client_fid_init); + +int client_fid_fini(struct obd_device *obd) +{ + struct client_obd *cli = &obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} +EXPORT_SYMBOL(client_fid_fini); + +struct proc_dir_entry *seq_type_proc_dir; + +static int __init fid_mod_init(void) +{ + seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, + proc_lustre_root, + NULL, NULL); + if (IS_ERR(seq_type_proc_dir)) + return PTR_ERR(seq_type_proc_dir); + +# ifdef HAVE_SERVER_SUPPORT + fid_server_mod_init(); +# endif + + return 0; +} + +static void __exit fid_mod_exit(void) +{ +# ifdef HAVE_SERVER_SUPPORT + fid_server_mod_exit(); +# endif + + if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) { + lprocfs_remove(&seq_type_proc_dir); + seq_type_proc_dir = NULL; + } +} + +MODULE_AUTHOR("Sun Microsystems, Inc. "); +MODULE_DESCRIPTION("Lustre FID Module"); +MODULE_VERSION(LUSTRE_VERSION_STRING); +MODULE_LICENSE("GPL"); + +module_init(fid_mod_init); +module_exit(fid_mod_exit);