X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffld%2Ffld_request.c;h=2e18b08ab5886a7b707fbd7a4ca7b7d14bcc89f9;hp=fce247cdfc4eb5b2ff544a660b85a26fd9854206;hb=e2af7fb3c91dfb13d34d8e1b2f2df8c09621f768;hpb=83f755685e50da38cb916ae4fb83f62429b5d568 diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c index fce247c..2e18b08 100644 --- a/lustre/fld/fld_request.c +++ b/lustre/fld/fld_request.c @@ -1,33 +1,45 @@ -/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * GPL HEADER START * - * lustre/fld/fld_request.c - * FLD (Fids Location Database) + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2006 Cluster File Systems, Inc. - * Author: Yury Umanets + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * GPL HEADER END */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011, Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/fld/fld_request.c + * + * FLD (Fids Location Database) + * + * Author: Yury Umanets + */ + #define DEBUG_SUBSYSTEM S_FLD #ifdef __KERNEL__ @@ -53,15 +65,15 @@ #include #include "fld_internal.h" -/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c +/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c * It should be common thing. The same about mdc RPC lock */ static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) { int rc; ENTRY; - spin_lock(&cli->cl_loi_list_lock); - rc = list_empty(&mcw->mcw_entry); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); + rc = cfs_list_empty(&mcw->mcw_entry); + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(rc); }; @@ -70,38 +82,38 @@ static void fld_enter_request(struct client_obd *cli) struct mdc_cache_waiter mcw; struct l_wait_info lwi = { 0 }; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); + cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); cfs_waitq_init(&mcw.mcw_waitq); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); } else { cli->cl_r_in_flight++; - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); } } static void fld_exit_request(struct client_obd *cli) { - struct list_head *l, *tmp; + cfs_list_t *l, *tmp; struct mdc_cache_waiter *mcw; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); cli->cl_r_in_flight--; - list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - + cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { + if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { /* No free request slots anymore */ break; } - mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry); - list_del_init(&mcw->mcw_entry); + mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry); + cfs_list_del_init(&mcw->mcw_entry); cli->cl_r_in_flight++; cfs_waitq_signal(&mcw->mcw_waitq); } - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); } static int fld_rrb_hash(struct lu_client_fld *fld, @@ -120,7 +132,7 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) hash = fld_rrb_hash(fld, seq); - list_for_each_entry(target, &fld->lcf_targets, ft_chain) { + cfs_list_for_each_entry(target, &fld->lcf_targets, ft_chain) { if (target->ft_idx == hash) RETURN(target); } @@ -129,7 +141,7 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) "Targets (%d):\n", fld->lcf_name, hash, seq, fld->lcf_count); - list_for_each_entry(target, &fld->lcf_targets, ft_chain) { + cfs_list_for_each_entry(target, &fld->lcf_targets, ft_chain) { const char *srv_name = target->ft_srv != NULL ? target->ft_srv->lsf_name : ""; const char *exp_name = target->ft_exp != NULL ? @@ -149,26 +161,7 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) RETURN(NULL); } -static int fld_dht_hash(struct lu_client_fld *fld, - seqno_t seq) -{ - /* XXX: here should be DHT hash */ - return fld_rrb_hash(fld, seq); -} - -static struct lu_fld_target * -fld_dht_scan(struct lu_client_fld *fld, seqno_t seq) -{ - /* XXX: here should be DHT scan code */ - return fld_rrb_scan(fld, seq); -} - -struct lu_fld_hash fld_hash[3] = { - { - .fh_name = "DHT", - .fh_hash_func = fld_dht_hash, - .fh_scan_func = fld_dht_scan - }, +struct lu_fld_hash fld_hash[] = { { .fh_name = "RRB", .fh_hash_func = fld_rrb_hash, @@ -188,9 +181,9 @@ fld_client_get_target(struct lu_client_fld *fld, LASSERT(fld->lcf_hash != NULL); - spin_lock(&fld->lcf_lock); + cfs_spin_lock(&fld->lcf_lock); target = fld->lcf_hash->fh_scan_func(fld, seq); - spin_unlock(&fld->lcf_lock); + cfs_spin_unlock(&fld->lcf_lock); if (target != NULL) { CDEBUG(D_INFO, "%s: Found target (idx "LPU64 @@ -230,10 +223,10 @@ int fld_client_add_target(struct lu_client_fld *fld, if (target == NULL) RETURN(-ENOMEM); - spin_lock(&fld->lcf_lock); - list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { + cfs_spin_lock(&fld->lcf_lock); + cfs_list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { if (tmp->ft_idx == tar->ft_idx) { - spin_unlock(&fld->lcf_lock); + cfs_spin_unlock(&fld->lcf_lock); OBD_FREE_PTR(target); CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n", name, fld_target_name(tmp), tmp->ft_idx); @@ -247,11 +240,11 @@ int fld_client_add_target(struct lu_client_fld *fld, target->ft_srv = tar->ft_srv; target->ft_idx = tar->ft_idx; - list_add_tail(&target->ft_chain, - &fld->lcf_targets); + cfs_list_add_tail(&target->ft_chain, + &fld->lcf_targets); fld->lcf_count++; - spin_unlock(&fld->lcf_lock); + cfs_spin_unlock(&fld->lcf_lock); RETURN(0); } @@ -264,13 +257,13 @@ int fld_client_del_target(struct lu_client_fld *fld, struct lu_fld_target *target, *tmp; ENTRY; - spin_lock(&fld->lcf_lock); - list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { + cfs_spin_lock(&fld->lcf_lock); + cfs_list_for_each_entry_safe(target, tmp, + &fld->lcf_targets, ft_chain) { if (target->ft_idx == idx) { fld->lcf_count--; - list_del(&target->ft_chain); - spin_unlock(&fld->lcf_lock); + cfs_list_del(&target->ft_chain); + cfs_spin_unlock(&fld->lcf_lock); if (target->ft_exp != NULL) class_export_put(target->ft_exp); @@ -279,13 +272,11 @@ int fld_client_del_target(struct lu_client_fld *fld, RETURN(0); } } - spin_unlock(&fld->lcf_lock); + cfs_spin_unlock(&fld->lcf_lock); RETURN(-ENOENT); } EXPORT_SYMBOL(fld_client_del_target); -static void fld_client_proc_fini(struct lu_client_fld *fld); - #ifdef LPROCFS static int fld_client_proc_init(struct lu_client_fld *fld) { @@ -318,7 +309,7 @@ out_cleanup: return rc; } -static void fld_client_proc_fini(struct lu_client_fld *fld) +void fld_client_proc_fini(struct lu_client_fld *fld) { ENTRY; if (fld->lcf_proc_dir) { @@ -334,12 +325,14 @@ static int fld_client_proc_init(struct lu_client_fld *fld) return 0; } -static void fld_client_proc_fini(struct lu_client_fld *fld) +void fld_client_proc_fini(struct lu_client_fld *fld) { return; } #endif +EXPORT_SYMBOL(fld_client_proc_fini); + static inline int hash_is_sane(int hash) { return (hash >= 0 && hash < ARRAY_SIZE(fld_hash)); @@ -348,9 +341,7 @@ static inline int hash_is_sane(int hash) int fld_client_init(struct lu_client_fld *fld, const char *prefix, int hash) { -#ifdef __KERNEL__ int cache_size, cache_threshold; -#endif int rc; ENTRY; @@ -366,12 +357,11 @@ int fld_client_init(struct lu_client_fld *fld, } fld->lcf_count = 0; - spin_lock_init(&fld->lcf_lock); + cfs_spin_lock_init(&fld->lcf_lock); fld->lcf_hash = &fld_hash[hash]; fld->lcf_flags = LUSTRE_FLD_INIT; CFS_INIT_LIST_HEAD(&fld->lcf_targets); -#ifdef __KERNEL__ cache_size = FLD_CLIENT_CACHE_SIZE / sizeof(struct fld_cache_entry); @@ -379,14 +369,12 @@ int fld_client_init(struct lu_client_fld *fld, FLD_CLIENT_CACHE_THRESHOLD / 100; fld->lcf_cache = fld_cache_init(fld->lcf_name, - FLD_CLIENT_HTABLE_SIZE, cache_size, cache_threshold); if (IS_ERR(fld->lcf_cache)) { rc = PTR_ERR(fld->lcf_cache); fld->lcf_cache = NULL; GOTO(out, rc); } -#endif rc = fld_client_proc_init(fld); if (rc) @@ -407,36 +395,32 @@ void fld_client_fini(struct lu_client_fld *fld) struct lu_fld_target *target, *tmp; ENTRY; - fld_client_proc_fini(fld); - - spin_lock(&fld->lcf_lock); - list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { + cfs_spin_lock(&fld->lcf_lock); + cfs_list_for_each_entry_safe(target, tmp, + &fld->lcf_targets, ft_chain) { fld->lcf_count--; - list_del(&target->ft_chain); + cfs_list_del(&target->ft_chain); if (target->ft_exp != NULL) class_export_put(target->ft_exp); OBD_FREE_PTR(target); } - spin_unlock(&fld->lcf_lock); + cfs_spin_unlock(&fld->lcf_lock); -#ifdef __KERNEL__ if (fld->lcf_cache != NULL) { if (!IS_ERR(fld->lcf_cache)) fld_cache_fini(fld->lcf_cache); fld->lcf_cache = NULL; } -#endif EXIT; } EXPORT_SYMBOL(fld_client_fini); -static int fld_client_rpc(struct obd_export *exp, - struct md_fld *mf, __u32 fld_op) +int fld_client_rpc(struct obd_export *exp, + struct lu_seq_range *range, __u32 fld_op) { struct ptlrpc_request *req; - struct md_fld *pmf; + struct lu_seq_range *prange; __u32 *op; int rc; ENTRY; @@ -451,11 +435,12 @@ static int fld_client_rpc(struct obd_export *exp, op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); *op = fld_op; - pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); - *pmf = *mf; + prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); + *prange = *range; ptlrpc_request_set_replen(req); req->rq_request_portal = FLD_REQUEST_PORTAL; + ptlrpc_at_set_req_timeout(req); if (fld_op != FLD_LOOKUP) mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); @@ -467,110 +452,31 @@ static int fld_client_rpc(struct obd_export *exp, if (rc) GOTO(out_req, rc); - pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); - if (pmf == NULL) + prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); + if (prange == NULL) GOTO(out_req, rc = -EFAULT); - *mf = *pmf; + *range = *prange; EXIT; out_req: ptlrpc_req_finished(req); return rc; } -int fld_client_create(struct lu_client_fld *fld, - seqno_t seq, mdsno_t mds, - const struct lu_env *env) +int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds, + __u32 flags, const struct lu_env *env) { - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds }; + struct lu_seq_range res; struct lu_fld_target *target; int rc; ENTRY; fld->lcf_flags |= LUSTRE_FLD_RUN; - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: " - LPU64") on target %s (idx "LPU64")\n", fld->lcf_name, - seq, mds, fld_target_name(target), target->ft_idx); - -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_create(target->ft_srv, env, seq, mds); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE); -#ifdef __KERNEL__ - } -#endif + rc = fld_cache_lookup(fld->lcf_cache, seq, &res); if (rc == 0) { - /* - * Do not return result of calling fld_cache_insert() - * here. First of all because it may return -EEXISTS. Another - * reason is that, we do not want to stop proceeding because of - * cache errors. - */ - fld_cache_insert(fld->lcf_cache, seq, mds); - } else { - CERROR("%s: Can't create FLD entry, rc %d\n", - fld->lcf_name, rc); - } - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_create); - -int fld_client_delete(struct lu_client_fld *fld, seqno_t seq, - const struct lu_env *env) -{ - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; - fld_cache_delete(fld->lcf_cache, seq); - - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on " - "target %s (idx "LPU64")\n", fld->lcf_name, seq, - fld_target_name(target), target->ft_idx); - -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_delete(target->ft_srv, - env, seq); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_DELETE); -#ifdef __KERNEL__ - } -#endif - - RETURN(rc); -} -EXPORT_SYMBOL(fld_client_delete); - -int fld_client_lookup(struct lu_client_fld *fld, - seqno_t seq, mdsno_t *mds, - const struct lu_env *env) -{ - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; - - rc = fld_cache_lookup(fld->lcf_cache, seq, mds); - if (rc == 0) + *mds = res.lsr_index; RETURN(0); + } /* Can not find it in the cache */ target = fld_client_get_target(fld, seq); @@ -580,35 +486,25 @@ int fld_client_lookup(struct lu_client_fld *fld, "target %s (idx "LPU64")\n", fld->lcf_name, seq, fld_target_name(target), target->ft_idx); + res.lsr_start = seq; + res.lsr_flags = flags; #ifdef __KERNEL__ if (target->ft_srv != NULL) { LASSERT(env != NULL); rc = fld_server_lookup(target->ft_srv, - env, seq, &md_fld.mf_mds); + env, seq, &res); } else { #endif - /* - * insert the 'inflight' sequence. No need to protect that, - * we are trying to reduce numbers of RPC but not restrict - * to them exactly one - */ - fld_cache_insert_inflight(fld->lcf_cache, seq); rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_LOOKUP); + &res, FLD_LOOKUP); #ifdef __KERNEL__ } #endif + if (rc == 0) { - *mds = md_fld.mf_mds; + *mds = res.lsr_index; - /* - * Do not return error here as well. See previous comment in - * same situation in function fld_client_create(). - */ - fld_cache_insert(fld->lcf_cache, seq, *mds); - } else { - /* remove 'inflight' seq if it exists */ - fld_cache_delete(fld->lcf_cache, seq); + fld_cache_insert(fld->lcf_cache, &res); } RETURN(rc); } @@ -616,9 +512,6 @@ EXPORT_SYMBOL(fld_client_lookup); void fld_client_flush(struct lu_client_fld *fld) { -#ifdef __KERNEL__ fld_cache_flush(fld->lcf_cache); -#endif } EXPORT_SYMBOL(fld_client_flush); -