X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ffld%2Ffld_request.c;h=93549226e0dace033b057774f64e7e65942cd356;hp=39fb13b2d85588dddfaa356bfbd7c27cc84b2201;hb=8d4ef45e078010d98f5c4f786e551d487f3e6e18;hpb=fbf5870b9848929d352460f1f005b79c0b5ccc5a diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c index 39fb13b..9354922 100644 --- a/lustre/fld/fld_request.c +++ b/lustre/fld/fld_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -40,111 +40,63 @@ * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_FLD -#ifdef __KERNEL__ -# include -# include -# include -# include -#else /* __KERNEL__ */ -# include -# include -#endif +#include +#include +#include #include #include -#include #include #include - -#include -#include #include #include #include #include "fld_internal.h" -/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c - * It should be common thing. The same about mdc RPC lock */ -static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) -{ - int rc; - ENTRY; - spin_lock(&cli->cl_loi_list_lock); - rc = list_empty(&mcw->mcw_entry); - spin_unlock(&cli->cl_loi_list_lock); - RETURN(rc); -}; - -static void fld_enter_request(struct client_obd *cli) +static int fld_rrb_hash(struct lu_client_fld *fld, u64 seq) { - struct mdc_cache_waiter mcw; - struct l_wait_info lwi = { 0 }; - - spin_lock(&cli->cl_loi_list_lock); - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); - cfs_waitq_init(&mcw.mcw_waitq); - spin_unlock(&cli->cl_loi_list_lock); - l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); - } else { - cli->cl_r_in_flight++; - spin_unlock(&cli->cl_loi_list_lock); - } -} - -static void fld_exit_request(struct client_obd *cli) -{ - struct list_head *l, *tmp; - struct mdc_cache_waiter *mcw; - - spin_lock(&cli->cl_loi_list_lock); - cli->cl_r_in_flight--; - list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - - if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { - /* No free request slots anymore */ - break; - } - - mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry); - list_del_init(&mcw->mcw_entry); - cli->cl_r_in_flight++; - cfs_waitq_signal(&mcw->mcw_waitq); - } - spin_unlock(&cli->cl_loi_list_lock); -} - -static int fld_rrb_hash(struct lu_client_fld *fld, - seqno_t seq) -{ - LASSERT(fld->lcf_count > 0); - return do_div(seq, fld->lcf_count); + LASSERT(fld->lcf_count > 0); + return do_div(seq, fld->lcf_count); } static struct lu_fld_target * -fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) +fld_rrb_scan(struct lu_client_fld *fld, u64 seq) { struct lu_fld_target *target; int hash; ENTRY; - hash = fld_rrb_hash(fld, seq); - - list_for_each_entry(target, &fld->lcf_targets, ft_chain) { + /* Because almost all of special sequence located in MDT0, + * it should go to index 0 directly, instead of calculating + * hash again, and also if other MDTs is not being connected, + * the fld lookup requests(for seq on MDT0) should not be + * blocked because of other MDTs */ + if (fid_seq_is_norm(seq)) + hash = fld_rrb_hash(fld, seq); + else + hash = 0; + +again: + list_for_each_entry(target, &fld->lcf_targets, ft_chain) { if (target->ft_idx == hash) RETURN(target); } + if (hash != 0) { + /* It is possible the remote target(MDT) are not connected to + * with client yet, so we will refer this to MDT0, which should + * be connected during mount */ + hash = 0; + goto again; + } + CERROR("%s: Can't find target by hash %d (seq "LPX64"). " "Targets (%d):\n", fld->lcf_name, hash, seq, fld->lcf_count); - list_for_each_entry(target, &fld->lcf_targets, ft_chain) { + list_for_each_entry(target, &fld->lcf_targets, ft_chain) { const char *srv_name = target->ft_srv != NULL ? target->ft_srv->lsf_name : ""; const char *exp_name = target->ft_exp != NULL ? @@ -164,48 +116,28 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) RETURN(NULL); } -static int fld_dht_hash(struct lu_client_fld *fld, - seqno_t seq) -{ - /* XXX: here should be DHT hash */ - return fld_rrb_hash(fld, seq); -} - -static struct lu_fld_target * -fld_dht_scan(struct lu_client_fld *fld, seqno_t seq) -{ - /* XXX: here should be DHT scan code */ - return fld_rrb_scan(fld, seq); -} - -struct lu_fld_hash fld_hash[3] = { - { - .fh_name = "DHT", - .fh_hash_func = fld_dht_hash, - .fh_scan_func = fld_dht_scan - }, +struct lu_fld_hash fld_hash[] = { { .fh_name = "RRB", .fh_hash_func = fld_rrb_hash, .fh_scan_func = fld_rrb_scan }, { - 0, + NULL, } }; static struct lu_fld_target * -fld_client_get_target(struct lu_client_fld *fld, - seqno_t seq) +fld_client_get_target(struct lu_client_fld *fld, u64 seq) { - struct lu_fld_target *target; - ENTRY; + struct lu_fld_target *target; + ENTRY; - LASSERT(fld->lcf_hash != NULL); + LASSERT(fld->lcf_hash != NULL); - spin_lock(&fld->lcf_lock); - target = fld->lcf_hash->fh_scan_func(fld, seq); - spin_unlock(&fld->lcf_lock); + spin_lock(&fld->lcf_lock); + target = fld->lcf_hash->fh_scan_func(fld, seq); + spin_unlock(&fld->lcf_lock); if (target != NULL) { CDEBUG(D_INFO, "%s: Found target (idx "LPU64 @@ -223,32 +155,26 @@ fld_client_get_target(struct lu_client_fld *fld, int fld_client_add_target(struct lu_client_fld *fld, struct lu_fld_target *tar) { - const char *name = fld_target_name(tar); + const char *name; struct lu_fld_target *target, *tmp; ENTRY; LASSERT(tar != NULL); + name = fld_target_name(tar); LASSERT(name != NULL); LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL); - if (fld->lcf_flags != LUSTRE_FLD_INIT) { - CERROR("%s: Attempt to add target %s (idx "LPU64") " - "on fly - skip it\n", fld->lcf_name, name, - tar->ft_idx); - RETURN(0); - } else { - CDEBUG(D_INFO, "%s: Adding target %s (idx " - LPU64")\n", fld->lcf_name, name, tar->ft_idx); - } + CDEBUG(D_INFO, "%s: Adding target %s (idx "LPU64")\n", fld->lcf_name, + name, tar->ft_idx); OBD_ALLOC_PTR(target); if (target == NULL) RETURN(-ENOMEM); - spin_lock(&fld->lcf_lock); - list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { - if (tmp->ft_idx == tar->ft_idx) { - spin_unlock(&fld->lcf_lock); + spin_lock(&fld->lcf_lock); + list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { + if (tmp->ft_idx == tar->ft_idx) { + spin_unlock(&fld->lcf_lock); OBD_FREE_PTR(target); CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n", name, fld_target_name(tmp), tmp->ft_idx); @@ -262,30 +188,27 @@ int fld_client_add_target(struct lu_client_fld *fld, target->ft_srv = tar->ft_srv; target->ft_idx = tar->ft_idx; - list_add_tail(&target->ft_chain, - &fld->lcf_targets); + list_add_tail(&target->ft_chain, &fld->lcf_targets); fld->lcf_count++; - spin_unlock(&fld->lcf_lock); + spin_unlock(&fld->lcf_lock); - RETURN(0); + RETURN(0); } EXPORT_SYMBOL(fld_client_add_target); /* Remove export from FLD */ -int fld_client_del_target(struct lu_client_fld *fld, - __u64 idx) +int fld_client_del_target(struct lu_client_fld *fld, __u64 idx) { - struct lu_fld_target *target, *tmp; - ENTRY; + struct lu_fld_target *target, *tmp; + ENTRY; - spin_lock(&fld->lcf_lock); - list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { - if (target->ft_idx == idx) { - fld->lcf_count--; - list_del(&target->ft_chain); - spin_unlock(&fld->lcf_lock); + spin_lock(&fld->lcf_lock); + list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) { + if (target->ft_idx == idx) { + fld->lcf_count--; + list_del(&target->ft_chain); + spin_unlock(&fld->lcf_lock); if (target->ft_exp != NULL) class_export_put(target->ft_exp); @@ -294,46 +217,40 @@ int fld_client_del_target(struct lu_client_fld *fld, RETURN(0); } } - spin_unlock(&fld->lcf_lock); - RETURN(-ENOENT); + spin_unlock(&fld->lcf_lock); + RETURN(-ENOENT); } -EXPORT_SYMBOL(fld_client_del_target); -static void fld_client_proc_fini(struct lu_client_fld *fld); - -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS static int fld_client_proc_init(struct lu_client_fld *fld) { - int rc; - ENTRY; - - fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, - fld_type_proc_dir, - NULL, NULL); - - if (IS_ERR(fld->lcf_proc_dir)) { - CERROR("%s: LProcFS failed in fld-init\n", - fld->lcf_name); - rc = PTR_ERR(fld->lcf_proc_dir); - RETURN(rc); - } - - rc = lprocfs_add_vars(fld->lcf_proc_dir, - fld_client_proc_list, fld); - if (rc) { - CERROR("%s: Can't init FLD proc, rc %d\n", - fld->lcf_name, rc); - GOTO(out_cleanup, rc); - } - - RETURN(0); + int rc; + ENTRY; + + fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, fld_type_proc_dir, + NULL, NULL); + if (IS_ERR(fld->lcf_proc_dir)) { + CERROR("%s: LProcFS failed in fld-init\n", + fld->lcf_name); + rc = PTR_ERR(fld->lcf_proc_dir); + RETURN(rc); + } + + rc = lprocfs_add_vars(fld->lcf_proc_dir, fld_client_proc_list, fld); + if (rc) { + CERROR("%s: Can't init FLD proc, rc %d\n", + fld->lcf_name, rc); + GOTO(out_cleanup, rc); + } + + RETURN(0); out_cleanup: - fld_client_proc_fini(fld); - return rc; + fld_client_proc_fini(fld); + return rc; } -static void fld_client_proc_fini(struct lu_client_fld *fld) +void fld_client_proc_fini(struct lu_client_fld *fld) { ENTRY; if (fld->lcf_proc_dir) { @@ -343,17 +260,19 @@ static void fld_client_proc_fini(struct lu_client_fld *fld) } EXIT; } -#else +#else /* !CONFIG_PROC_FS */ static int fld_client_proc_init(struct lu_client_fld *fld) { return 0; } -static void fld_client_proc_fini(struct lu_client_fld *fld) +void fld_client_proc_fini(struct lu_client_fld *fld) { return; } -#endif +#endif /* CONFIG_PROC_FS */ + +EXPORT_SYMBOL(fld_client_proc_fini); static inline int hash_is_sane(int hash) { @@ -363,9 +282,7 @@ static inline int hash_is_sane(int hash) int fld_client_init(struct lu_client_fld *fld, const char *prefix, int hash) { -#ifdef __KERNEL__ int cache_size, cache_threshold; -#endif int rc; ENTRY; @@ -380,13 +297,11 @@ int fld_client_init(struct lu_client_fld *fld, RETURN(-EINVAL); } - fld->lcf_count = 0; - spin_lock_init(&fld->lcf_lock); - fld->lcf_hash = &fld_hash[hash]; - fld->lcf_flags = LUSTRE_FLD_INIT; - CFS_INIT_LIST_HEAD(&fld->lcf_targets); + fld->lcf_count = 0; + spin_lock_init(&fld->lcf_lock); + fld->lcf_hash = &fld_hash[hash]; + INIT_LIST_HEAD(&fld->lcf_targets); -#ifdef __KERNEL__ cache_size = FLD_CLIENT_CACHE_SIZE / sizeof(struct fld_cache_entry); @@ -394,14 +309,12 @@ int fld_client_init(struct lu_client_fld *fld, FLD_CLIENT_CACHE_THRESHOLD / 100; fld->lcf_cache = fld_cache_init(fld->lcf_name, - FLD_CLIENT_HTABLE_SIZE, cache_size, cache_threshold); if (IS_ERR(fld->lcf_cache)) { rc = PTR_ERR(fld->lcf_cache); fld->lcf_cache = NULL; GOTO(out, rc); } -#endif rc = fld_client_proc_init(fld); if (rc) @@ -419,231 +332,237 @@ EXPORT_SYMBOL(fld_client_init); void fld_client_fini(struct lu_client_fld *fld) { - struct lu_fld_target *target, *tmp; - ENTRY; + struct lu_fld_target *target, *tmp; + ENTRY; - fld_client_proc_fini(fld); - - spin_lock(&fld->lcf_lock); - list_for_each_entry_safe(target, tmp, - &fld->lcf_targets, ft_chain) { + spin_lock(&fld->lcf_lock); + list_for_each_entry_safe(target, tmp, &fld->lcf_targets, ft_chain) { fld->lcf_count--; - list_del(&target->ft_chain); + list_del(&target->ft_chain); if (target->ft_exp != NULL) class_export_put(target->ft_exp); OBD_FREE_PTR(target); } - spin_unlock(&fld->lcf_lock); + spin_unlock(&fld->lcf_lock); -#ifdef __KERNEL__ if (fld->lcf_cache != NULL) { if (!IS_ERR(fld->lcf_cache)) fld_cache_fini(fld->lcf_cache); fld->lcf_cache = NULL; } -#endif EXIT; } EXPORT_SYMBOL(fld_client_fini); -static int fld_client_rpc(struct obd_export *exp, - struct md_fld *mf, __u32 fld_op) +int fld_client_rpc(struct obd_export *exp, + struct lu_seq_range *range, __u32 fld_op, + struct ptlrpc_request **reqp) { - struct ptlrpc_request *req; - struct md_fld *pmf; - __u32 *op; - int rc; - ENTRY; - - LASSERT(exp != NULL); - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_FLD_QUERY, - LUSTRE_MDS_VERSION, FLD_QUERY); - if (req == NULL) - RETURN(-ENOMEM); - - op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); - *op = fld_op; - - pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); - *pmf = *mf; - - ptlrpc_request_set_replen(req); + struct ptlrpc_request *req = NULL; + struct lu_seq_range *prange; + __u32 *op; + int rc = 0; + struct obd_import *imp; + ENTRY; + + LASSERT(exp != NULL); + +again: + imp = class_exp2cliimp(exp); + switch (fld_op) { + case FLD_QUERY: + req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY, + LUSTRE_MDS_VERSION, FLD_QUERY); + if (req == NULL) + RETURN(-ENOMEM); + + /* XXX: only needed when talking to old server(< 2.6), it should + * be removed when < 2.6 server is not supported */ + op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); + *op = FLD_LOOKUP; + + /* For MDS_MDS seq lookup, it will always use LWP connection, + * but LWP will be evicted after restart, so cause the error. + * so we will set no_delay for seq lookup request, once the + * request fails because of the eviction. always retry here */ + if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) { + req->rq_allow_replay = 1; + req->rq_no_delay = 1; + } + break; + case FLD_READ: + req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_READ, + LUSTRE_MDS_VERSION, FLD_READ); + if (req == NULL) + RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, + RCL_SERVER, PAGE_CACHE_SIZE); + break; + default: + rc = -EINVAL; + break; + } + + if (rc != 0) + RETURN(rc); + + prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); + *prange = *range; + ptlrpc_request_set_replen(req); req->rq_request_portal = FLD_REQUEST_PORTAL; + req->rq_reply_portal = MDC_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); - if (fld_op != FLD_LOOKUP) - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - fld_enter_request(&exp->exp_obd->u.cli); - rc = ptlrpc_queue_wait(req); - fld_exit_request(&exp->exp_obd->u.cli); - if (fld_op != FLD_LOOKUP) - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - if (rc) - GOTO(out_req, rc); - - pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); - if (pmf == NULL) - GOTO(out_req, rc = -EFAULT); - *mf = *pmf; - EXIT; + obd_get_request_slot(&exp->exp_obd->u.cli); + rc = ptlrpc_queue_wait(req); + obd_put_request_slot(&exp->exp_obd->u.cli); + + if (rc == -ENOENT) { + /* Don't loop forever on non-existing FID sequences. */ + GOTO(out_req, rc); + } + + if (rc != 0) { + if (imp->imp_state != LUSTRE_IMP_CLOSED && !imp->imp_deactive) { + /* Since LWP is not replayable, so it will keep + * trying unless umount happens, otherwise it would + * cause unecessary failure of the application. */ + ptlrpc_req_finished(req); + rc = 0; + goto again; + } + GOTO(out_req, rc); + } + + if (fld_op == FLD_QUERY) { + prange = req_capsule_server_get(&req->rq_pill, + &RMF_FLD_MDFLD); + if (prange == NULL) + GOTO(out_req, rc = -EFAULT); + *range = *prange; + } + + EXIT; out_req: - ptlrpc_req_finished(req); - return rc; -} - -int fld_client_create(struct lu_client_fld *fld, - seqno_t seq, mdsno_t mds, - const struct lu_env *env) -{ - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds }; - struct lu_fld_target *target; - int rc; - ENTRY; + if (rc != 0 || reqp == NULL) { + ptlrpc_req_finished(req); + req = NULL; + } - fld->lcf_flags |= LUSTRE_FLD_RUN; - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: " - LPU64") on target %s (idx "LPU64")\n", fld->lcf_name, - seq, mds, fld_target_name(target), target->ft_idx); - -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_create(target->ft_srv, env, seq, mds); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE); -#ifdef __KERNEL__ - } -#endif - - if (rc == 0) { - /* - * Do not return result of calling fld_cache_insert() - * here. First of all because it may return -EEXIST. Another - * reason is that, we do not want to stop proceeding because of - * cache errors. - */ - fld_cache_insert(fld->lcf_cache, seq, mds); - } else { - CERROR("%s: Can't create FLD entry, rc %d\n", - fld->lcf_name, rc); - } + if (reqp != NULL) + *reqp = req; - RETURN(rc); + return rc; } -EXPORT_SYMBOL(fld_client_create); -int fld_client_delete(struct lu_client_fld *fld, seqno_t seq, - const struct lu_env *env) +int fld_client_lookup(struct lu_client_fld *fld, u64 seq, u32 *mds, + __u32 flags, const struct lu_env *env) { - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; - fld_cache_delete(fld->lcf_cache, seq); + struct lu_seq_range res = { 0 }; + struct lu_fld_target *target; + struct lu_fld_target *origin; + int rc; + ENTRY; + + rc = fld_cache_lookup(fld->lcf_cache, seq, &res); + if (rc == 0) { + *mds = res.lsr_index; + RETURN(0); + } + /* Can not find it in the cache */ target = fld_client_get_target(fld, seq); LASSERT(target != NULL); - - CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on " + origin = target; +again: + CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on " "target %s (idx "LPU64")\n", fld->lcf_name, seq, fld_target_name(target), target->ft_idx); -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_delete(target->ft_srv, - env, seq); - } else { -#endif - rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_DELETE); -#ifdef __KERNEL__ - } -#endif - - RETURN(rc); + res.lsr_start = seq; + fld_range_set_type(&res, flags); + +#ifdef HAVE_SERVER_SUPPORT + if (target->ft_srv != NULL) { + LASSERT(env != NULL); + rc = fld_server_lookup(env, target->ft_srv, seq, &res); + } else +#endif /* HAVE_SERVER_SUPPORT */ + { + rc = fld_client_rpc(target->ft_exp, &res, FLD_QUERY, NULL); + } + + if (rc == -ESHUTDOWN) { + /* If fld lookup failed because the target has been shutdown, + * then try next target in the list, until trying all targets + * or fld lookup succeeds */ + spin_lock(&fld->lcf_lock); + + /* If the next entry in the list is the head of the list, + * move to the next entry after the head and retrieve + * the target. Else retreive the next target entry. */ + + if (target->ft_chain.next == &fld->lcf_targets) + target = list_entry(target->ft_chain.next->next, + struct lu_fld_target, ft_chain); + else + target = list_entry(target->ft_chain.next, + struct lu_fld_target, + ft_chain); + spin_unlock(&fld->lcf_lock); + if (target != origin) + goto again; + } + if (rc == 0) { + *mds = res.lsr_index; + fld_cache_insert(fld->lcf_cache, &res); + } + + RETURN(rc); } -EXPORT_SYMBOL(fld_client_delete); +EXPORT_SYMBOL(fld_client_lookup); -int fld_client_lookup(struct lu_client_fld *fld, - seqno_t seq, mdsno_t *mds, - const struct lu_env *env) +void fld_client_flush(struct lu_client_fld *fld) { - struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 }; - struct lu_fld_target *target; - int rc; - ENTRY; - - fld->lcf_flags |= LUSTRE_FLD_RUN; + fld_cache_flush(fld->lcf_cache); +} - rc = fld_cache_lookup(fld->lcf_cache, seq, mds); - if (rc == 0) - RETURN(0); - /* Can not find it in the cache */ - target = fld_client_get_target(fld, seq); - LASSERT(target != NULL); +struct proc_dir_entry *fld_type_proc_dir; - CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on " - "target %s (idx "LPU64")\n", fld->lcf_name, seq, - fld_target_name(target), target->ft_idx); +static int __init fld_init(void) +{ + fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME, + proc_lustre_root, + NULL, NULL); + if (IS_ERR(fld_type_proc_dir)) + return PTR_ERR(fld_type_proc_dir); -#ifdef __KERNEL__ - if (target->ft_srv != NULL) { - LASSERT(env != NULL); - rc = fld_server_lookup(target->ft_srv, - env, seq, &md_fld.mf_mds); - } else { -#endif - /* - * insert the 'inflight' sequence. No need to protect that, - * we are trying to reduce numbers of RPC but not restrict - * to them exactly one - */ - fld_cache_insert_inflight(fld->lcf_cache, seq); - rc = fld_client_rpc(target->ft_exp, - &md_fld, FLD_LOOKUP); -#ifdef __KERNEL__ - } -#endif - if (seq < FID_SEQ_START) { - /* - * The current solution for IGIF is to bind it to mds0. - * In the future, this should be fixed once IGIF can be found - * in FLD. - */ - md_fld.mf_mds = 0; - rc = 0; - } +#ifdef HAVE_SERVER_SUPPORT + fld_server_mod_init(); +#endif /* HAVE_SERVER_SUPPORT */ - if (rc == 0) { - *mds = md_fld.mf_mds; - - /* - * Do not return error here as well. See previous comment in - * same situation in function fld_client_create(). - */ - fld_cache_insert(fld->lcf_cache, seq, *mds); - } else { - /* remove 'inflight' seq if it exists */ - fld_cache_delete(fld->lcf_cache, seq); - } - RETURN(rc); + return 0; } -EXPORT_SYMBOL(fld_client_lookup); -void fld_client_flush(struct lu_client_fld *fld) +static void __exit fld_exit(void) { -#ifdef __KERNEL__ - fld_cache_flush(fld->lcf_cache); -#endif +#ifdef HAVE_SERVER_SUPPORT + fld_server_mod_exit(); +#endif /* HAVE_SERVER_SUPPORT */ + + if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) { + lprocfs_remove(&fld_type_proc_dir); + fld_type_proc_dir = NULL; + } } -EXPORT_SYMBOL(fld_client_flush); + +MODULE_AUTHOR("OpenSFS, Inc. "); +MODULE_DESCRIPTION("Lustre FID Location Database"); +MODULE_VERSION(LUSTRE_VERSION_STRING); +MODULE_LICENSE("GPL"); + +module_init(fld_init); +module_exit(fld_exit);