X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffld%2Ffld_request.c;h=1999dac1507e386a58cbf3c4491f8a14a7e72a29;hp=2a2a92733a3b37c16c6fc6973eb937eda9ef0204;hb=a68e117a54af82f209aed46a822f8346a3e0703f;hpb=001b8dbfacb747f1649a2eb047a5f118ce32fdc7 diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c index 2a2a927..1999dac 100644 --- a/lustre/fld/fld_request.c +++ b/lustre/fld/fld_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,14 +42,9 @@ #define DEBUG_SUBSYSTEM S_FLD -#ifdef __KERNEL__ -# include -# include -# include -#else /* __KERNEL__ */ -# include -# include -#endif +#include +#include +#include #include #include @@ -60,66 +55,14 @@ #include #include "fld_internal.h" -/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c - * It should be common thing. The same about mdc RPC lock */ -static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) +static int fld_rrb_hash(struct lu_client_fld *fld, u64 seq) { - int rc; - ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); - rc = cfs_list_empty(&mcw->mcw_entry); - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); -}; - -static void fld_enter_request(struct client_obd *cli) -{ - struct mdc_cache_waiter mcw; - struct l_wait_info lwi = { 0 }; - - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); - init_waitqueue_head(&mcw.mcw_waitq); - client_obd_list_unlock(&cli->cl_loi_list_lock); - l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); - } else { - cli->cl_r_in_flight++; - client_obd_list_unlock(&cli->cl_loi_list_lock); - } -} - -static void fld_exit_request(struct client_obd *cli) -{ - cfs_list_t *l, *tmp; - struct mdc_cache_waiter *mcw; - - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_r_in_flight--; - cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - /* No free request slots anymore */ - break; - } - - mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry); - cfs_list_del_init(&mcw->mcw_entry); - cli->cl_r_in_flight++; - wake_up(&mcw->mcw_waitq); - } - client_obd_list_unlock(&cli->cl_loi_list_lock); -} - -static int fld_rrb_hash(struct lu_client_fld *fld, - seqno_t seq) -{ - LASSERT(fld->lcf_count > 0); - return do_div(seq, fld->lcf_count); + LASSERT(fld->lcf_count > 0); + return do_div(seq, fld->lcf_count); } static struct lu_fld_target * -fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) +fld_rrb_scan(struct lu_client_fld *fld, u64 seq) { struct lu_fld_target *target; int hash; @@ -135,16 +78,25 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) else hash = 0; - cfs_list_for_each_entry(target, &fld->lcf_targets, ft_chain) { +again: + list_for_each_entry(target, &fld->lcf_targets, ft_chain) { if (target->ft_idx == hash) RETURN(target); } + if (hash != 0) { + /* It is possible the remote target(MDT) are not connected to + * with client yet, so we will refer this to MDT0, which should + * be connected during mount */ + hash = 0; + goto again; + } + CERROR("%s: Can't find target by hash %d (seq "LPX64"). " "Targets (%d):\n", fld->lcf_name, hash, seq, fld->lcf_count); - cfs_list_for_each_entry(target, &fld->lcf_targets, ft_chain) { + list_for_each_entry(target, &fld->lcf_targets, ft_chain) { const char *srv_name = target->ft_srv != NULL ? target->ft_srv->lsf_name : ""; const char *exp_name = target->ft_exp != NULL ? @@ -171,12 +123,12 @@ struct lu_fld_hash fld_hash[] = { .fh_scan_func = fld_rrb_scan }, { - 0, + NULL, } }; static struct lu_fld_target * -fld_client_get_target(struct lu_client_fld *fld, seqno_t seq) +fld_client_get_target(struct lu_client_fld *fld, u64 seq) { struct lu_fld_target *target; ENTRY; @@ -212,22 +164,15 @@ int fld_client_add_target(struct lu_client_fld *fld, LASSERT(name != NULL); LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL); - if (fld->lcf_flags != LUSTRE_FLD_INIT) { - CERROR("%s: Attempt to add target %s (idx "LPU64") " - "on fly - skip it\n", fld->lcf_name, name, - tar->ft_idx); - RETURN(0); - } else { - CDEBUG(D_INFO, "%s: Adding target %s (idx " - LPU64")\n", fld->lcf_name, name, tar->ft_idx); - } + CDEBUG(D_INFO, "%s: Adding target %s (idx "LPU64")\n", fld->lcf_name, + name, tar->ft_idx); OBD_ALLOC_PTR(target); if (target == NULL) RETURN(-ENOMEM); spin_lock(&fld->lcf_lock); - cfs_list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { + list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { if (tmp->ft_idx == tar->ft_idx) { spin_unlock(&fld->lcf_lock); OBD_FREE_PTR(target); @@ -243,8 +188,7 @@ int fld_client_add_target(struct lu_client_fld *fld, target->ft_srv = tar->ft_srv; target->ft_idx = tar->ft_idx; - cfs_list_add_tail(&target->ft_chain, - &fld->lcf_targets); + list_add_tail(&target->ft_chain, &fld->lcf_targets); fld->lcf_count++; spin_unlock(&fld->lcf_lock); @@ -260,11 +204,10 @@ int fld_client_del_target(struct lu_client_fld *fld, __u64 idx) ENTRY; spin_lock(&fld->lcf_lock); - cfs_list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { + list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) { if (target->ft_idx == idx) { fld->lcf_count--; - cfs_list_del(&target->ft_chain); + list_del(&target->ft_chain); spin_unlock(&fld->lcf_lock); if (target->ft_exp != NULL) @@ -277,17 +220,15 @@ int fld_client_del_target(struct lu_client_fld *fld, __u64 idx) spin_unlock(&fld->lcf_lock); RETURN(-ENOENT); } -EXPORT_SYMBOL(fld_client_del_target); -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS static int fld_client_proc_init(struct lu_client_fld *fld) { int rc; ENTRY; - fld->lcf_proc_dir = lprocfs_seq_register(fld->lcf_name, - fld_type_proc_dir, - NULL, NULL); + fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, fld_type_proc_dir, + NULL, NULL); if (IS_ERR(fld->lcf_proc_dir)) { CERROR("%s: LProcFS failed in fld-init\n", fld->lcf_name); @@ -295,8 +236,7 @@ static int fld_client_proc_init(struct lu_client_fld *fld) RETURN(rc); } - rc = lprocfs_seq_add_vars(fld->lcf_proc_dir, - fld_client_proc_list, fld); + rc = lprocfs_add_vars(fld->lcf_proc_dir, fld_client_proc_list, fld); if (rc) { CERROR("%s: Can't init FLD proc, rc %d\n", fld->lcf_name, rc); @@ -320,7 +260,7 @@ void fld_client_proc_fini(struct lu_client_fld *fld) } EXIT; } -#else +#else /* !CONFIG_PROC_FS */ static int fld_client_proc_init(struct lu_client_fld *fld) { return 0; @@ -330,7 +270,7 @@ void fld_client_proc_fini(struct lu_client_fld *fld) { return; } -#endif +#endif /* CONFIG_PROC_FS */ EXPORT_SYMBOL(fld_client_proc_fini); @@ -357,11 +297,10 @@ int fld_client_init(struct lu_client_fld *fld, RETURN(-EINVAL); } - fld->lcf_count = 0; + fld->lcf_count = 0; spin_lock_init(&fld->lcf_lock); - fld->lcf_hash = &fld_hash[hash]; - fld->lcf_flags = LUSTRE_FLD_INIT; - CFS_INIT_LIST_HEAD(&fld->lcf_targets); + fld->lcf_hash = &fld_hash[hash]; + INIT_LIST_HEAD(&fld->lcf_targets); cache_size = FLD_CLIENT_CACHE_SIZE / sizeof(struct fld_cache_entry); @@ -397,10 +336,9 @@ void fld_client_fini(struct lu_client_fld *fld) ENTRY; spin_lock(&fld->lcf_lock); - cfs_list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { + list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) { fld->lcf_count--; - cfs_list_del(&target->ft_chain); + list_del(&target->ft_chain); if (target->ft_exp != NULL) class_export_put(target->ft_exp); OBD_FREE_PTR(target); @@ -477,16 +415,25 @@ again: req->rq_reply_portal = MDC_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); - fld_enter_request(&exp->exp_obd->u.cli); + obd_get_request_slot(&exp->exp_obd->u.cli); rc = ptlrpc_queue_wait(req); - fld_exit_request(&exp->exp_obd->u.cli); + obd_put_request_slot(&exp->exp_obd->u.cli); + + if (rc == -ENOENT) { + /* Don't loop forever on non-existing FID sequences. */ + GOTO(out_req, rc); + } + if (rc != 0) { - if (rc == -EWOULDBLOCK) { - /* For no_delay req(see above), EWOULDBLOCK means the - * connection is being evicted, but this seq lookup - * should not return error, since it would cause - * unecessary failure of the application, instead - * it should retry here */ + if (imp->imp_state != LUSTRE_IMP_CLOSED && + !imp->imp_deactive && + imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS && + rc != -ENOTSUPP) { + /* Since LWP is not replayable, so it will keep + * trying unless umount happens or the remote + * target does not support the operation, otherwise + * it would cause unecessary failure of the + * application. */ ptlrpc_req_finished(req); rc = 0; goto again; @@ -515,26 +462,26 @@ out_req: return rc; } -int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds, +int fld_client_lookup(struct lu_client_fld *fld, u64 seq, u32 *mds, __u32 flags, const struct lu_env *env) { struct lu_seq_range res = { 0 }; struct lu_fld_target *target; + struct lu_fld_target *origin; int rc; ENTRY; - fld->lcf_flags |= LUSTRE_FLD_RUN; - - rc = fld_cache_lookup(fld->lcf_cache, seq, &res); - if (rc == 0) { - *mds = res.lsr_index; - RETURN(0); - } + rc = fld_cache_lookup(fld->lcf_cache, seq, &res); + if (rc == 0) { + *mds = res.lsr_index; + RETURN(0); + } /* Can not find it in the cache */ target = fld_client_get_target(fld, seq); LASSERT(target != NULL); - + origin = target; +again: CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on " "target %s (idx "LPU64")\n", fld->lcf_name, seq, fld_target_name(target), target->ft_idx); @@ -542,16 +489,37 @@ int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds, res.lsr_start = seq; fld_range_set_type(&res, flags); -#if defined(__KERNEL__) && defined(HAVE_SERVER_SUPPORT) +#ifdef HAVE_SERVER_SUPPORT if (target->ft_srv != NULL) { LASSERT(env != NULL); rc = fld_server_lookup(env, target->ft_srv, seq, &res); } else -#endif +#endif /* HAVE_SERVER_SUPPORT */ { rc = fld_client_rpc(target->ft_exp, &res, FLD_QUERY, NULL); } + if (rc == -ESHUTDOWN) { + /* If fld lookup failed because the target has been shutdown, + * then try next target in the list, until trying all targets + * or fld lookup succeeds */ + spin_lock(&fld->lcf_lock); + + /* If the next entry in the list is the head of the list, + * move to the next entry after the head and retrieve + * the target. Else retreive the next target entry. */ + + if (target->ft_chain.next == &fld->lcf_targets) + target = list_entry(target->ft_chain.next->next, + struct lu_fld_target, ft_chain); + else + target = list_entry(target->ft_chain.next, + struct lu_fld_target, + ft_chain); + spin_unlock(&fld->lcf_lock); + if (target != origin) + goto again; + } if (rc == 0) { *mds = res.lsr_index; fld_cache_insert(fld->lcf_cache, &res); @@ -565,32 +533,30 @@ void fld_client_flush(struct lu_client_fld *fld) { fld_cache_flush(fld->lcf_cache); } -EXPORT_SYMBOL(fld_client_flush); -#ifdef __KERNEL__ struct proc_dir_entry *fld_type_proc_dir; -static int __init fld_mod_init(void) +static int __init fld_init(void) { - fld_type_proc_dir = lprocfs_seq_register(LUSTRE_FLD_NAME, - proc_lustre_root, - NULL, NULL); + fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME, + proc_lustre_root, + NULL, NULL); if (IS_ERR(fld_type_proc_dir)) return PTR_ERR(fld_type_proc_dir); #ifdef HAVE_SERVER_SUPPORT fld_server_mod_init(); -#endif +#endif /* HAVE_SERVER_SUPPORT */ return 0; } -static void __exit fld_mod_exit(void) +static void __exit fld_exit(void) { #ifdef HAVE_SERVER_SUPPORT fld_server_mod_exit(); -#endif +#endif /* HAVE_SERVER_SUPPORT */ if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) { lprocfs_remove(&fld_type_proc_dir); @@ -598,9 +564,10 @@ static void __exit fld_mod_exit(void) } } -MODULE_AUTHOR("Sun Microsystems, Inc. "); -MODULE_DESCRIPTION("Lustre FLD"); +MODULE_AUTHOR("OpenSFS, Inc. "); +MODULE_DESCRIPTION("Lustre FID Location Database"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(mdd, LUSTRE_VERSION_STRING, fld_mod_init, fld_mod_exit); -#endif /* __KERNEL__ */ +module_init(fld_init); +module_exit(fld_exit);