X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ffid%2Ffid_handler.c;h=e85d0efa37379d9f0e60569d03b46fd24b99e885;hb=refs%2Ftags%2F1.10.0.41a;hp=0522eb8e3a49b9d874173a762b555f6d2150bbfb;hpb=a16eedc674f96364a8a7e4905112dc45d10e2f84;p=fs%2Flustre-release.git diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index 0522eb8..e85d0ef 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -1,29 +1,43 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/fid/fid_handler.c - * Lustre Sequence Manager + * GPL HEADER START * - * Copyright (c) 2006 Cluster File Systems, Inc. - * Author: Yury Umanets + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/fid/fid_handler.c + * + * Lustre Sequence Manager + * + * Author: Yury Umanets */ #ifndef EXPORT_SYMTAB @@ -60,7 +74,7 @@ int seq_server_set_cli(struct lu_server_seq *seq, * Ask client for new range, assign that range to ->seq_space and write * seq state to backing store should be atomic. */ - down(&seq->lss_sem); + cfs_down(&seq->lss_sem); if (cli == NULL) { CDEBUG(D_INFO, "%s: Detached sequence client %s\n", @@ -79,23 +93,30 @@ int seq_server_set_cli(struct lu_server_seq *seq, seq->lss_name, cli->lcs_name); seq->lss_cli = cli; + cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id; EXIT; out_up: - up(&seq->lss_sem); + cfs_up(&seq->lss_sem); return rc; } EXPORT_SYMBOL(seq_server_set_cli); -/* +/** * On controller node, allocate new super sequence for regular sequence server. + * As this super sequence controller, this node suppose to maintain fld + * and update index. + * \a out range always has currect mds node number of requester. */ + static int __seq_server_alloc_super(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { - struct lu_range *space = &seq->lss_space; - int rc; + struct lu_seq_range *space = &seq->lss_space; + struct thandle *th; + __u64 mdt = out->lsr_mdt; + int rc, credit; ENTRY; LASSERT(range_is_sane(space)); @@ -104,8 +125,8 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, CDEBUG(D_INFO, "%s: Input seq range: " DRANGE"\n", seq->lss_name, PRANGE(in)); - if (in->lr_end > space->lr_start) - space->lr_start = in->lr_end; + if (in->lsr_end > space->lsr_start) + space->lsr_start = in->lsr_end; *out = *in; CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n", @@ -116,7 +137,7 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, "Only "LPU64" sequences left\n", seq->lss_name, range_space(space)); *out = *space; - space->lr_start = space->lr_end; + space->lsr_start = space->lsr_end; } else if (range_is_exhausted(space)) { CERROR("%s: Sequences space is exhausted\n", seq->lss_name); @@ -125,42 +146,61 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq, range_alloc(out, space, seq->lss_width); } } + out->lsr_mdt = mdt; - rc = seq_store_write(seq, env); + credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS; + + th = seq_store_trans_start(seq, env, credit); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); - RETURN(rc); + goto out; + } + + rc = fld_server_create(seq->lss_site->ms_server_fld, + env, out, th); + if (rc) { + CERROR("%s: Can't Update fld database, rc %d\n", + seq->lss_name, rc); } - CDEBUG(D_INFO, "%s: Allocated super-sequence " - DRANGE"\n", seq->lss_name, PRANGE(out)); +out: + seq_store_trans_stop(seq, env, th); + + CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d " + DRANGE"\n", seq->lss_name, rc, PRANGE(out)); RETURN(rc); } int seq_server_alloc_super(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { int rc; ENTRY; - down(&seq->lss_sem); + cfs_down(&seq->lss_sem); rc = __seq_server_alloc_super(seq, in, out, env); - up(&seq->lss_sem); + cfs_up(&seq->lss_sem); RETURN(rc); } static int __seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { - struct lu_range *space = &seq->lss_space; + struct lu_seq_range *space = &seq->lss_space; + struct thandle *th; int rc = 0; + ENTRY; LASSERT(range_is_sane(space)); @@ -173,28 +213,26 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, CDEBUG(D_INFO, "%s: Input seq range: " DRANGE"\n", seq->lss_name, PRANGE(in)); - if (range_is_exhausted(space)) { + if (in->lsr_end <= space->lsr_start) { /* - * Server cannot send empty range to client, this is why - * we check here that range from client is "newer" than - * exhausted super. + * Client is replaying a fairly old range, server + * don't need to do any allocation. */ - LASSERT(in->lr_end > space->lr_start); - + } else if (range_is_exhausted(space)) { /* * Start is set to end of last allocated, because it * *is* already allocated so we take that into account * and do not use for other allocations. */ - space->lr_start = in->lr_end; + space->lsr_start = in->lsr_end; /* - * End is set to in->lr_start + super sequence - * allocation unit. That is because in->lr_start is + * End is set to in->lsr_start + super sequence + * allocation unit. That is because in->lsr_start is * first seq in new allocated range from controller * before failure. */ - space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH; + space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH; if (!seq->lss_cli) { CERROR("%s: No sequence controller " @@ -207,6 +245,7 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, * obtained range from it was @space. */ rc = seq_client_replay_super(seq->lss_cli, space, env); + if (rc) { CERROR("%s: Can't replay super-sequence, " "rc %d\n", seq->lss_name, rc); @@ -217,8 +256,21 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, * Update super start by end from client's range. Super * end should not be changed if range was not exhausted. */ - if (in->lr_end > space->lr_start) - space->lr_start = in->lr_end; + space->lsr_start = in->lsr_end; + } + + /* sending replay_super to update fld as only super sequence + * server can update fld. + * we are sending meta sequence to fld rather than super + * sequence, but fld server can handle range merging. */ + + in->lsr_mdt = space->lsr_mdt; + rc = seq_client_replay_super(seq->lss_cli, in, env); + + if (rc) { + CERROR("%s: Can't replay super-sequence, " + "rc %d\n", seq->lss_name, rc); + RETURN(rc); } *out = *in; @@ -252,7 +304,11 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, range_alloc(out, space, seq->lss_width); } - rc = seq_store_write(seq, env); + th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); @@ -263,20 +319,21 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, DRANGE"\n", seq->lss_name, PRANGE(out)); } + seq_store_trans_stop(seq, env, th); RETURN(rc); } int seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_range *in, - struct lu_range *out, + struct lu_seq_range *in, + struct lu_seq_range *out, const struct lu_env *env) { int rc; ENTRY; - down(&seq->lss_sem); + cfs_down(&seq->lss_sem); rc = __seq_server_alloc_meta(seq, in, out, env); - up(&seq->lss_sem); + cfs_up(&seq->lss_sem); RETURN(rc); } @@ -284,29 +341,31 @@ EXPORT_SYMBOL(seq_server_alloc_meta); static int seq_server_handle(struct lu_site *site, const struct lu_env *env, - __u32 opc, struct lu_range *in, - struct lu_range *out) + __u32 opc, struct lu_seq_range *in, + struct lu_seq_range *out) { int rc; + struct md_site *mite; ENTRY; + mite = lu_site2md(site); switch (opc) { case SEQ_ALLOC_META: - if (!site->ls_server_seq) { + if (!mite->ms_server_seq) { CERROR("Sequence server is not " "initialized\n"); RETURN(-EINVAL); } - rc = seq_server_alloc_meta(site->ls_server_seq, + rc = seq_server_alloc_meta(mite->ms_server_seq, in, out, env); break; case SEQ_ALLOC_SUPER: - if (!site->ls_control_seq) { + if (!mite->ms_control_seq) { CERROR("Sequence controller is not " "initialized\n"); RETURN(-EINVAL); } - rc = seq_server_alloc_super(site->ls_control_seq, + rc = seq_server_alloc_super(mite->ms_control_seq, in, out, env); break; default: @@ -321,7 +380,7 @@ static int seq_req_handle(struct ptlrpc_request *req, const struct lu_env *env, struct seq_thread_info *info) { - struct lu_range *out, *in = NULL; + struct lu_seq_range *out, *in = NULL, *tmp; struct lu_site *site; int rc = -EPROTO; __u32 *opc; @@ -330,25 +389,31 @@ static int seq_req_handle(struct ptlrpc_request *req, site = req->rq_export->exp_obd->obd_lu_dev->ld_site; LASSERT(site != NULL); - rc = req_capsule_pack(&info->sti_pill); + rc = req_capsule_server_pack(info->sti_pill); if (rc) RETURN(err_serious(rc)); - opc = req_capsule_client_get(&info->sti_pill, - &RMF_SEQ_OPC); + opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC); if (opc != NULL) { - out = req_capsule_server_get(&info->sti_pill, - &RMF_SEQ_RANGE); + out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE); if (out == NULL) RETURN(err_serious(-EPROTO)); + tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - in = req_capsule_client_get(&info->sti_pill, - &RMF_SEQ_RANGE); + in = tmp; - LASSERT(!range_is_zero(in) && range_is_sane(in)); + if (range_is_zero(in) || !range_is_sane(in)) { + CERROR("Replayed seq range is invalid: " + DRANGE"\n", PRANGE(in)); + RETURN(err_serious(-EINVAL)); + } } + /* seq client passed mdt id, we need to pass that using out + * range parameter */ + out->lsr_mdt = tmp->lsr_mdt; rc = seq_server_handle(site, env, *opc, in, out); } else rc = err_serious(-EPROTO); @@ -356,29 +421,24 @@ static int seq_req_handle(struct ptlrpc_request *req, RETURN(rc); } +/* context key constructor/destructor: seq_key_init, seq_key_fini */ LU_KEY_INIT_FINI(seq, struct seq_thread_info); +/* context key: seq_thread_key */ LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD); static void seq_thread_info_init(struct ptlrpc_request *req, struct seq_thread_info *info) { - int i; - - /* Mark rep buffer as req-layout stuff expects */ - for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++) - info->sti_rep_buf_size[i] = -1; - + info->sti_pill = &req->rq_pill; /* Init request capsule */ - req_capsule_init(&info->sti_pill, req, RCL_SERVER, - info->sti_rep_buf_size); - - req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY); + req_capsule_init(info->sti_pill, req, RCL_SERVER); + req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY); } static void seq_thread_info_fini(struct seq_thread_info *info) { - req_capsule_fini(&info->sti_pill); + req_capsule_fini(info->sti_pill); } static int seq_handle(struct ptlrpc_request *req) @@ -405,7 +465,7 @@ static int seq_handle(struct ptlrpc_request *req) */ int seq_query(struct com_thread_info *info) { - return seq_handle(info->cti_pill.rc_req); + return seq_handle(info->cti_pill->rc_req); } EXPORT_SYMBOL(seq_query); @@ -466,8 +526,10 @@ int seq_server_init(struct lu_server_seq *seq, struct dt_device *dev, const char *prefix, enum lu_mgr_type type, + struct md_site *ms, const struct lu_env *env) { + struct thandle *th; int rc, is_srv = (type == LUSTRE_SEQ_SERVER); ENTRY; @@ -476,8 +538,9 @@ int seq_server_init(struct lu_server_seq *seq, seq->lss_cli = NULL; seq->lss_type = type; - range_zero(&seq->lss_space); - sema_init(&seq->lss_sem, 1); + seq->lss_site = ms; + range_init(&seq->lss_space); + cfs_sema_init(&seq->lss_sem, 1); seq->lss_width = is_srv ? LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH; @@ -488,7 +551,6 @@ int seq_server_init(struct lu_server_seq *seq, rc = seq_store_init(seq, env, dev); if (rc) GOTO(out, rc); - /* Request backing store for saved sequence info. */ rc = seq_store_read(seq, env); if (rc == -ENODATA) { @@ -498,16 +560,22 @@ int seq_server_init(struct lu_server_seq *seq, LUSTRE_SEQ_ZERO_RANGE: LUSTRE_SEQ_SPACE_RANGE; + seq->lss_space.lsr_mdt = ms->ms_node_id; CDEBUG(D_INFO, "%s: No data found " "on store. Initialize space\n", seq->lss_name); + th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + /* Save default controller value to store. */ - rc = seq_store_write(seq, env); + rc = seq_store_write(seq, env, th); if (rc) { CERROR("%s: Can't write space data, " "rc %d\n", seq->lss_name, rc); } + seq_store_trans_stop(seq, env, th); } else if (rc) { CERROR("%s: Can't read space data, rc %d\n", seq->lss_name, rc); @@ -547,6 +615,18 @@ EXPORT_SYMBOL(seq_server_fini); cfs_proc_dir_entry_t *seq_type_proc_dir = NULL; +static struct lu_local_obj_desc llod_seq_srv = { + .llod_name = LUSTRE_SEQ_SRV_NAME, + .llod_oid = FID_SEQ_SRV_OID, + .llod_is_index = 0, +}; + +static struct lu_local_obj_desc llod_seq_ctl = { + .llod_name = LUSTRE_SEQ_CTL_NAME, + .llod_oid = FID_SEQ_CTL_OID, + .llod_is_index = 0, +}; + static int __init fid_mod_init(void) { seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, @@ -555,6 +635,9 @@ static int __init fid_mod_init(void) if (IS_ERR(seq_type_proc_dir)) return PTR_ERR(seq_type_proc_dir); + llo_local_obj_register(&llod_seq_srv); + llo_local_obj_register(&llod_seq_ctl); + LU_CONTEXT_KEY_INIT(&seq_thread_key); lu_context_key_register(&seq_thread_key); return 0; @@ -562,6 +645,9 @@ static int __init fid_mod_init(void) static void __exit fid_mod_exit(void) { + llo_local_obj_unregister(&llod_seq_srv); + llo_local_obj_unregister(&llod_seq_ctl); + lu_context_key_degister(&seq_thread_key); if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) { lprocfs_remove(&seq_type_proc_dir); @@ -569,7 +655,7 @@ static void __exit fid_mod_exit(void) } } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre FID Module"); MODULE_LICENSE("GPL");