X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=413d0f5ed2d5bff2353a59e71e08cd79d03ae1a4;hp=3910422e6dc06f5929b87634b92abe5247295333;hb=00e814ff75c4da15a769f18736c70ab1e6a5b174;hpb=fd908da92ccd9aab4ffc3d2463301831260c0474 diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 3910422..413d0f5 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -1,34 +1,45 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * lustre/fid/fid_request.c - * Lustre Sequence Manager + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Copyright (c) 2006 Cluster File Systems, Inc. - * Author: Yury Umanets + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * Copyright (c) 2011, 2012, Intel Corporation. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/fid/fid_request.c + * + * Lustre Sequence Manager + * + * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_FID #ifdef __KERNEL__ @@ -49,112 +60,116 @@ #include #include "fid_internal.h" -static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input, - struct lu_range *output, __u32 opc, +static int seq_client_rpc(struct lu_client_seq *seq, + struct lu_seq_range *output, __u32 opc, const char *opcname) { - struct obd_export *exp = seq->lcs_exp; - struct ptlrpc_request *req; - struct lu_range *out, *in; - __u32 *op; - int rc; - ENTRY; - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, - LUSTRE_MDS_VERSION, SEQ_QUERY); - if (req == NULL) - RETURN(-ENOMEM); - - /* Init operation code */ - op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); - *op = opc; - - /* Zero out input range, this is not recovery yet. */ - in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); - if (input != NULL) - *in = *input; - else - range_zero(in); - - ptlrpc_request_set_replen(req); - - if (seq->lcs_type == LUSTRE_SEQ_METADATA) { - req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? - SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL; - } else { - req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ? - SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL; - } - - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - - if (rc) - GOTO(out_req, rc); - - out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); - *output = *out; - - if (!range_is_sane(output)) { - CERROR("%s: Invalid range received from server: " - DRANGE"\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - - if (range_is_exhausted(output)) { - CERROR("%s: Range received from server is exhausted: " - DRANGE"]\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - *in = *out; - - CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n", - seq->lcs_name, opcname, PRANGE(output)); - - EXIT; + struct obd_export *exp = seq->lcs_exp; + struct ptlrpc_request *req; + struct lu_seq_range *out, *in; + __u32 *op; + unsigned int debug_mask; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, + LUSTRE_MDS_VERSION, SEQ_QUERY); + if (req == NULL) + RETURN(-ENOMEM); + + /* Init operation code */ + op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); + *op = opc; + + /* Zero out input range, this is not recovery yet. */ + in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); + range_init(in); + + ptlrpc_request_set_replen(req); + + if (seq->lcs_type == LUSTRE_SEQ_METADATA) { + req->rq_request_portal = SEQ_METADATA_PORTAL; + in->lsr_flags = LU_SEQ_RANGE_MDT; + } else { + LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA, + "unknown lcs_type %u\n", seq->lcs_type); + req->rq_request_portal = SEQ_DATA_PORTAL; + in->lsr_flags = LU_SEQ_RANGE_OST; + } + + if (opc == SEQ_ALLOC_SUPER) { + /* Update index field of *in, it is required for + * FLD update on super sequence allocator node. */ + in->lsr_index = seq->lcs_space.lsr_index; + req->rq_request_portal = SEQ_CONTROLLER_PORTAL; + debug_mask = D_CONSOLE; + } else { + debug_mask = D_INFO; + LASSERTF(opc == SEQ_ALLOC_META, + "unknown opcode %u\n, opc", opc); + } + + ptlrpc_at_set_req_timeout(req); + + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + + if (rc) + GOTO(out_req, rc); + + out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); + *output = *out; + + if (!range_is_sane(output)) { + CERROR("%s: Invalid range received from server: " + DRANGE"\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + if (range_is_exhausted(output)) { + CERROR("%s: Range received from server is exhausted: " + DRANGE"]\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", + seq->lcs_name, opcname, PRANGE(output)); + + EXIT; out_req: - ptlrpc_req_finished(req); - return rc; + ptlrpc_req_finished(req); + return rc; } /* Request sequence-controller node to allocate new super-sequence. */ -int seq_client_replay_super(struct lu_client_seq *seq, - struct lu_range *range, - const struct lu_env *env) +int seq_client_alloc_super(struct lu_client_seq *seq, + const struct lu_env *env) { int rc; ENTRY; - down(&seq->lcs_sem); + mutex_lock(&seq->lcs_mutex); #ifdef __KERNEL__ if (seq->lcs_srv) { LASSERT(env != NULL); - rc = seq_server_alloc_super(seq->lcs_srv, range, - &seq->lcs_space, env); + rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space, + env); } else { #endif - rc = seq_client_rpc(seq, range, &seq->lcs_space, + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_SUPER, "super"); #ifdef __KERNEL__ } #endif - up(&seq->lcs_sem); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } -/* Request sequence-controller node to allocate new super-sequence. */ -int seq_client_alloc_super(struct lu_client_seq *seq, - const struct lu_env *env) -{ - ENTRY; - RETURN(seq_client_replay_super(seq, NULL, env)); -} - /* Request sequence-controller node to allocate new meta-sequence. */ -static int seq_client_alloc_meta(struct lu_client_seq *seq, - const struct lu_env *env) +static int seq_client_alloc_meta(const struct lu_env *env, + struct lu_client_seq *seq) { int rc; ENTRY; @@ -162,11 +177,10 @@ static int seq_client_alloc_meta(struct lu_client_seq *seq, #ifdef __KERNEL__ if (seq->lcs_srv) { LASSERT(env != NULL); - rc = seq_server_alloc_meta(seq->lcs_srv, NULL, - &seq->lcs_space, env); + rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); } else { #endif - rc = seq_client_rpc(seq, NULL, &seq->lcs_space, + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_META, "meta"); #ifdef __KERNEL__ } @@ -175,7 +189,8 @@ static int seq_client_alloc_meta(struct lu_client_seq *seq, } /* Allocate new sequence for client. */ -static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) +static int seq_client_alloc_seq(const struct lu_env *env, + struct lu_client_seq *seq, seqno_t *seqnr) { int rc; ENTRY; @@ -183,9 +198,9 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) LASSERT(range_is_sane(&seq->lcs_space)); if (range_is_exhausted(&seq->lcs_space)) { - rc = seq_client_alloc_meta(seq, NULL); + rc = seq_client_alloc_meta(env, seq); if (rc) { - CERROR("%s: Can't allocate new meta-sequence, " + CERROR("%s: Can't allocate new meta-sequence," "rc %d\n", seq->lcs_name, rc); RETURN(rc); } else { @@ -197,8 +212,8 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) } LASSERT(!range_is_exhausted(&seq->lcs_space)); - *seqnr = seq->lcs_space.lr_start; - seq->lcs_space.lr_start += 1; + *seqnr = seq->lcs_space.lsr_start; + seq->lcs_space.lsr_start += 1; CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, *seqnr); @@ -206,27 +221,118 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr) RETURN(rc); } +static int seq_fid_alloc_prep(struct lu_client_seq *seq, + cfs_waitlink_t *link) +{ + if (seq->lcs_update) { + cfs_waitq_add(&seq->lcs_waitq, link); + cfs_set_current_state(CFS_TASK_UNINT); + mutex_unlock(&seq->lcs_mutex); + + cfs_waitq_wait(link, CFS_TASK_UNINT); + + mutex_lock(&seq->lcs_mutex); + cfs_waitq_del(&seq->lcs_waitq, link); + cfs_set_current_state(CFS_TASK_RUNNING); + return -EAGAIN; + } + ++seq->lcs_update; + mutex_unlock(&seq->lcs_mutex); + return 0; +} + +static void seq_fid_alloc_fini(struct lu_client_seq *seq) +{ + LASSERT(seq->lcs_update == 1); + mutex_lock(&seq->lcs_mutex); + --seq->lcs_update; + cfs_waitq_signal(&seq->lcs_waitq); +} + +/* Allocate the whole seq to the caller*/ +int seq_client_get_seq(const struct lu_env *env, + struct lu_client_seq *seq, seqno_t *seqnr) +{ + cfs_waitlink_t link; + int rc; + + LASSERT(seqnr != NULL); + mutex_lock(&seq->lcs_mutex); + cfs_waitlink_init(&link); + + while (1) { + rc = seq_fid_alloc_prep(seq, &link); + if (rc == 0) + break; + } + + rc = seq_client_alloc_seq(env, seq, seqnr); + if (rc) { + CERROR("%s: Can't allocate new sequence, " + "rc %d\n", seq->lcs_name, rc); + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); + return rc; + } + + CDEBUG(D_INFO, "%s: allocate sequence " + "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); + + /*Since the caller require the whole seq, + *so marked this seq to be used*/ + seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; + + /* + * Inform caller that sequence switch is performed to allow it + * to setup FLD for it. + */ + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); + + return rc; +} +EXPORT_SYMBOL(seq_client_get_seq); + /* Allocate new fid on passed client @seq and save it to @fid. */ -int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) +int seq_client_alloc_fid(const struct lu_env *env, + struct lu_client_seq *seq, struct lu_fid *fid) { + cfs_waitlink_t link; int rc; ENTRY; LASSERT(seq != NULL); LASSERT(fid != NULL); - down(&seq->lcs_sem); + cfs_waitlink_init(&link); + mutex_lock(&seq->lcs_mutex); + + if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) + seq->lcs_fid.f_oid = seq->lcs_width; - if (fid_is_zero(&seq->lcs_fid) || - fid_oid(&seq->lcs_fid) >= seq->lcs_width) - { + while (1) { seqno_t seqnr; - rc = seq_client_alloc_seq(seq, &seqnr); + if (!fid_is_zero(&seq->lcs_fid) && + fid_oid(&seq->lcs_fid) < seq->lcs_width) { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid += 1; + rc = 0; + break; + } + + rc = seq_fid_alloc_prep(seq, &link); + if (rc) + continue; + + rc = seq_client_alloc_seq(env, seq, &seqnr); if (rc) { CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); - up(&seq->lcs_sem); + seq_fid_alloc_fini(seq); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -242,14 +348,13 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid) * to setup FLD for it. */ rc = 1; - } else { - /* Just bump last allocated fid and return to caller. */ - seq->lcs_fid.f_oid += 1; - rc = 0; + + seq_fid_alloc_fini(seq); + break; } *fid = seq->lcs_fid; - up(&seq->lcs_sem); + mutex_unlock(&seq->lcs_mutex); CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); RETURN(rc); @@ -262,11 +367,34 @@ EXPORT_SYMBOL(seq_client_alloc_fid); */ void seq_client_flush(struct lu_client_seq *seq) { + cfs_waitlink_t link; + LASSERT(seq != NULL); - down(&seq->lcs_sem); + cfs_waitlink_init(&link); + mutex_lock(&seq->lcs_mutex); + + while (seq->lcs_update) { + cfs_waitq_add(&seq->lcs_waitq, &link); + cfs_set_current_state(CFS_TASK_UNINT); + mutex_unlock(&seq->lcs_mutex); + + cfs_waitq_wait(&link, CFS_TASK_UNINT); + + mutex_lock(&seq->lcs_mutex); + cfs_waitq_del(&seq->lcs_waitq, &link); + cfs_set_current_state(CFS_TASK_RUNNING); + } + fid_zero(&seq->lcs_fid); - range_zero(&seq->lcs_space); - up(&seq->lcs_sem); + /** + * this id shld not be used for seq range allocation. + * set to -1 for dgb check. + */ + + seq->lcs_space.lsr_index = -1; + + range_init(&seq->lcs_space); + mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); @@ -332,35 +460,37 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; - sema_init(&seq->lcs_sem, 1); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; + seq->lcs_srv = srv; + seq->lcs_type = type; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + mutex_init(&seq->lcs_mutex); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + cfs_waitq_init(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); + else if (type == LUSTRE_SEQ_METADATA) + LASSERT(seq->lcs_srv != NULL); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init);