/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * lustre/fld/fld_handler.c
+ * lustre/fld/fld_request.c
* FLD (Fids Location Database)
*
* Copyright (C) 2006 Cluster File Systems, Inc.
#include <lustre_fld.h>
#include "fld_internal.h"
-#ifdef __KERNEL__
-static __u32
-fld_cache_hash(__u64 seq)
+static int fld_rrb_hash(struct lu_client_fld *fld,
+ seqno_t seq)
{
- return (__u32)seq;
+ LASSERT(fld->lcf_count > 0);
+ return do_div(seq, fld->lcf_count);
}
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
- __u64 seq, __u64 mds)
+static struct lu_fld_target *
+fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
{
- struct fld_cache_entry *flde, *fldt;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- int rc = 0;
+ struct lu_fld_target *target;
+ int hash;
ENTRY;
- OBD_ALLOC_PTR(flde);
- if (!flde)
- RETURN(-ENOMEM);
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq)
- GOTO(exit_unlock, rc = -EEXIST);
- }
-
- INIT_HLIST_NODE(&flde->fce_list);
- flde->fce_mds = mds;
- flde->fce_seq = seq;
-
- hlist_add_head(&flde->fce_list, bucket);
+ hash = fld_rrb_hash(fld, seq);
- EXIT;
-exit_unlock:
- spin_unlock(&fld_cache->fci_lock);
- if (rc != 0)
- OBD_FREE_PTR(flde);
- return rc;
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct fld_cache_entry *flde;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- ENTRY;
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(flde, scan, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- hlist_del_init(&flde->fce_list);
- GOTO(out_unlock, 0);
- }
+ list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == hash)
+ RETURN(target);
}
- EXIT;
-out_unlock:
- spin_unlock(&fld_cache->fci_lock);
- return;
-}
-
-static struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct fld_cache_entry *flde;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- ENTRY;
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(flde, scan, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- spin_unlock(&fld_cache->fci_lock);
- RETURN(flde);
- }
- }
- spin_unlock(&fld_cache->fci_lock);
+ /*
+ * If target is not found, there is logical error anyway, so here is
+ * LBUG() to catch this situation.
+ */
+ LBUG();
RETURN(NULL);
}
-#endif
-static int
-fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+static int fld_dht_hash(struct lu_client_fld *fld,
+ seqno_t seq)
{
- if (fld->fld_count == 0)
- return 0;
-
- return do_div(seq, fld->fld_count);
+ /* XXX: here should be DHT hash */
+ return fld_rrb_hash(fld, seq);
}
-static int
-fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+static struct lu_fld_target *
+fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
{
- /* XXX: here should DHT hash */
- return fld_rrb_hash(fld, seq);
+ /* XXX: here should be DHT scan code */
+ return fld_rrb_scan(fld, seq);
}
struct lu_fld_hash fld_hash[3] = {
{
.fh_name = "DHT",
- .fh_func = fld_dht_hash
+ .fh_hash_func = fld_dht_hash,
+ .fh_scan_func = fld_dht_scan
},
{
- .fh_name = "Round Robin",
- .fh_func = fld_rrb_hash
+ .fh_name = "RRB",
+ .fh_hash_func = fld_rrb_hash,
+ .fh_scan_func = fld_rrb_scan
},
{
0,
}
};
-static struct obd_export *
-fld_client_get_target(struct lu_client_fld *fld, __u64 seq)
+static struct lu_fld_target *
+fld_client_get_target(struct lu_client_fld *fld,
+ seqno_t seq)
{
- struct obd_export *fld_exp;
- int count = 0, hash;
+ struct lu_fld_target *target;
ENTRY;
- LASSERT(fld->fld_hash != NULL);
+ LASSERT(fld->lcf_hash != NULL);
- spin_lock(&fld->fld_lock);
- hash = fld->fld_hash->fh_func(fld, seq);
- spin_unlock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
+ target = fld->lcf_hash->fh_scan_func(fld, seq);
+ spin_unlock(&fld->lcf_lock);
- spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp,
- &fld->fld_exports, exp_fld_chain) {
- if (count == hash) {
- spin_unlock(&fld->fld_lock);
- RETURN(fld_exp);
- }
- count++;
- }
- spin_unlock(&fld->fld_lock);
- RETURN(NULL);
+ RETURN(target);
}
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
-int
-fld_client_add_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+/*
+ * Add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module.
+ */
+int fld_client_add_target(struct lu_client_fld *fld,
+ struct lu_fld_target *tar)
{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct obd_export *fld_exp;
+ const char *name = fld_target_name(tar);
+ struct lu_fld_target *target, *tmp;
ENTRY;
- LASSERT(exp != NULL);
+ LASSERT(tar != NULL);
+ LASSERT(name != NULL);
+ LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
+
+ CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
+ fld->lcf_name, name);
- CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
- cli->cl_target_uuid.uuid);
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- spin_unlock(&fld->fld_lock);
+ OBD_ALLOC_PTR(target);
+ if (target == NULL)
+ RETURN(-ENOMEM);
+
+ spin_lock(&fld->lcf_lock);
+ list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+ if (tmp->ft_idx == tar->ft_idx) {
+ spin_unlock(&fld->lcf_lock);
+ OBD_FREE_PTR(target);
+ CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+ name, fld_target_name(tmp), tmp->ft_idx);
RETURN(-EEXIST);
}
}
-
- fld_exp = class_export_get(exp);
- list_add_tail(&fld_exp->exp_fld_chain,
- &fld->fld_exports);
- fld->fld_count++;
-
- spin_unlock(&fld->fld_lock);
-
+
+ target->ft_exp = tar->ft_exp;
+ if (target->ft_exp != NULL)
+ class_export_get(target->ft_exp);
+ target->ft_srv = tar->ft_srv;
+ target->ft_idx = tar->ft_idx;
+
+ list_add_tail(&target->ft_chain,
+ &fld->lcf_targets);
+
+ fld->lcf_count++;
+ spin_unlock(&fld->lcf_lock);
+
RETURN(0);
}
EXPORT_SYMBOL(fld_client_add_target);
-/* remove export from FLD */
-int
-fld_client_del_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+/* Remove export from FLD */
+int fld_client_del_target(struct lu_client_fld *fld,
+ __u64 idx)
{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
- spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
- if (obd_uuid_equals(&fld_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
+ spin_lock(&fld->lcf_lock);
+ list_for_each_entry_safe(target, tmp,
+ &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == idx) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ spin_unlock(&fld->lcf_lock);
- spin_unlock(&fld->fld_lock);
+ if (target->ft_exp != NULL)
+ class_export_put(target->ft_exp);
+
+ OBD_FREE_PTR(target);
RETURN(0);
}
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
RETURN(-ENOENT);
}
EXPORT_SYMBOL(fld_client_del_target);
+static void fld_client_proc_fini(struct lu_client_fld *fld);
+
#ifdef LPROCFS
-static int
-fld_client_proc_init(struct lu_client_fld *fld)
+static int fld_client_proc_init(struct lu_client_fld *fld)
{
int rc;
ENTRY;
- fld->fld_proc_dir = lprocfs_register(fld->fld_name,
- proc_lustre_root,
+ fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
+ fld_type_proc_dir,
NULL, NULL);
-
- if (IS_ERR(fld->fld_proc_dir)) {
- CERROR("LProcFS failed in fld-init\n");
- rc = PTR_ERR(fld->fld_proc_dir);
- GOTO(err, rc);
+
+ if (IS_ERR(fld->lcf_proc_dir)) {
+ CERROR("%s: LProcFS failed in fld-init\n",
+ fld->lcf_name);
+ rc = PTR_ERR(fld->lcf_proc_dir);
+ RETURN(rc);
}
- rc = lprocfs_add_vars(fld->fld_proc_dir,
+ rc = lprocfs_add_vars(fld->lcf_proc_dir,
fld_client_proc_list, fld);
if (rc) {
- CERROR("can't init FLD "
- "proc, rc %d\n", rc);
- GOTO(err, rc);
+ CERROR("%s: Can't init FLD proc, rc %d\n",
+ fld->lcf_name, rc);
+ GOTO(out_cleanup, rc);
}
RETURN(0);
-err:
- fld->fld_proc_dir = NULL;
+out_cleanup:
+ fld_client_proc_fini(fld);
return rc;
}
-static void
-fld_client_proc_fini(struct lu_client_fld *fld)
+static void fld_client_proc_fini(struct lu_client_fld *fld)
{
ENTRY;
- if (fld->fld_proc_dir) {
- lprocfs_remove(fld->fld_proc_dir);
- fld->fld_proc_dir = NULL;
+ if (fld->lcf_proc_dir) {
+ if (!IS_ERR(fld->lcf_proc_dir))
+ lprocfs_remove(fld->lcf_proc_dir);
+ fld->lcf_proc_dir = NULL;
}
EXIT;
}
+#else
+static int fld_client_proc_init(struct lu_client_fld *fld)
+{
+ return 0;
+}
+
+static void fld_client_proc_fini(struct lu_client_fld *fld)
+{
+ return;
+}
#endif
-int
-fld_client_init(struct lu_client_fld *fld,
- const char *uuid, int hash)
+static inline int hash_is_sane(int hash)
{
- int rc = 0;
+ return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
+}
+
+/* 1M of FLD cache will not hurt client a lot */
+#define FLD_CACHE_SIZE 1024000
+
+/* cache threshold is 10 percent of size */
+#define FLD_CACHE_THRESHOLD 10
+
+int fld_client_init(struct lu_client_fld *fld,
+ const char *prefix, int hash)
+{
+#ifdef __KERNEL__
+ int cache_size, cache_threshold;
+#endif
+ int rc;
ENTRY;
LASSERT(fld != NULL);
- if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
- CERROR("wrong hash function 0x%x\n", hash);
+ snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+ "cli-%s", prefix);
+
+ if (!hash_is_sane(hash)) {
+ CERROR("%s: Wrong hash function %#x\n",
+ fld->lcf_name, hash);
RETURN(-EINVAL);
}
-
- INIT_LIST_HEAD(&fld->fld_exports);
- spin_lock_init(&fld->fld_lock);
- fld->fld_hash = &fld_hash[hash];
- fld->fld_count = 0;
-
- snprintf(fld->fld_name, sizeof(fld->fld_name),
- "%s-%s", LUSTRE_FLD_NAME, uuid);
-
-#ifdef LPROCFS
+
+ fld->lcf_count = 0;
+ spin_lock_init(&fld->lcf_lock);
+ fld->lcf_hash = &fld_hash[hash];
+ INIT_LIST_HEAD(&fld->lcf_targets);
+
+#ifdef __KERNEL__
+ cache_size = FLD_CACHE_SIZE /
+ sizeof(struct fld_cache_entry);
+
+ cache_threshold = cache_size *
+ FLD_CACHE_THRESHOLD / 100;
+
+ fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
+ cache_size,
+ cache_threshold);
+ if (IS_ERR(fld->lcf_cache)) {
+ rc = PTR_ERR(fld->lcf_cache);
+ fld->lcf_cache = NULL;
+ GOTO(out, rc);
+ }
+#endif
+
rc = fld_client_proc_init(fld);
if (rc)
GOTO(out, rc);
-#endif
EXIT;
out:
if (rc)
fld_client_fini(fld);
- else
+ else
CDEBUG(D_INFO|D_WARNING,
- "Client FLD, using \"%s\" hash\n",
- fld->fld_hash->fh_name);
+ "%s: Using \"%s\" hash\n",
+ fld->lcf_name, fld->lcf_hash->fh_name);
return rc;
}
EXPORT_SYMBOL(fld_client_init);
-void
-fld_client_fini(struct lu_client_fld *fld)
+void fld_client_fini(struct lu_client_fld *fld)
{
- struct obd_export *fld_exp;
- struct obd_export *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
-#ifdef LPROCFS
fld_client_proc_fini(fld);
-#endif
-
- spin_lock(&fld->fld_lock);
- list_for_each_entry_safe(fld_exp, tmp,
- &fld->fld_exports, exp_fld_chain) {
- fld->fld_count--;
- list_del(&fld_exp->exp_fld_chain);
- class_export_get(fld_exp);
+
+ spin_lock(&fld->lcf_lock);
+ list_for_each_entry_safe(target, tmp,
+ &fld->lcf_targets, ft_chain) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ if (target->ft_exp != NULL)
+ class_export_put(target->ft_exp);
+ OBD_FREE_PTR(target);
+ }
+ spin_unlock(&fld->lcf_lock);
+
+#ifdef __KERNEL__
+ if (fld->lcf_cache != NULL) {
+ if (!IS_ERR(fld->lcf_cache))
+ fld_cache_fini(fld->lcf_cache);
+ fld->lcf_cache = NULL;
}
- spin_unlock(&fld->fld_lock);
- CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
+#endif
+
EXIT;
}
EXPORT_SYMBOL(fld_client_fini);
-static int
-fld_client_rpc(struct obd_export *exp,
- struct md_fld *mf, __u32 fld_op)
+static int fld_client_rpc(struct obd_export *exp,
+ struct md_fld *mf, __u32 fld_op)
{
- int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
- int mf_size = sizeof(struct md_fld);
+ int size[3] = { sizeof(struct ptlrpc_body),
+ sizeof(__u32),
+ sizeof(struct md_fld) };
struct ptlrpc_request *req;
+ struct req_capsule pill;
struct md_fld *pmf;
__u32 *op;
+ int rc;
ENTRY;
LASSERT(exp != NULL);
req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_MDS_VERSION, FLD_QUERY,
- 2, size, NULL);
+ LUSTRE_MDS_VERSION,
+ FLD_QUERY, 3, size,
+ NULL);
if (req == NULL)
RETURN(-ENOMEM);
- op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+ req_capsule_init(&pill, req, RCL_CLIENT, NULL);
+ req_capsule_set(&pill, &RQF_FLD_QUERY);
+
+ op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
*op = fld_op;
- pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
- memcpy(pmf, mf, sizeof(*mf));
+ pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
+ *pmf = *mf;
- req->rq_replen = lustre_msg_size(1, &mf_size);
+ size[1] = sizeof(struct md_fld);
+ ptlrpc_req_set_repsize(req, 2, size);
req->rq_request_portal = FLD_REQUEST_PORTAL;
rc = ptlrpc_queue_wait(req);
if (rc)
GOTO(out_req, rc);
- pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
- lustre_swab_md_fld);
- *mf = *pmf;
+ pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
+ if (pmf == NULL)
+ GOTO(out_req, rc = -EFAULT);
+ *mf = *pmf;
+ EXIT;
out_req:
+ req_capsule_fini(&pill);
ptlrpc_req_finished(req);
- RETURN(rc);
+ return rc;
}
-int
-fld_client_create(struct lu_client_fld *fld,
- __u64 seq, mdsno_t mds)
+int fld_client_create(struct lu_client_fld *fld,
+ seqno_t seq, mdsno_t mds,
+ const struct lu_env *env)
{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
- __u32 rc;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
+ struct lu_fld_target *target;
+ int rc;
ENTRY;
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
- md_fld.mf_seq = seq;
- md_fld.mf_mds = mds;
-
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
-
+ target = fld_client_get_target(fld, seq);
+ LASSERT(target != NULL);
+
#ifdef __KERNEL__
- if (rc == 0)
- rc = fld_cache_insert(fld_cache, seq, mds);
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_create(target->ft_srv,
+ env, seq, mds);
+ } else {
#endif
-
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ }
+#endif
+
+ if (rc == 0) {
+ /*
+ * Do not return result of calling fld_cache_insert()
+ * here. First of all because it may return -EEXISTS. Another
+ * reason is that, we do not want to stop proceeding because of
+ * cache errors. --umka
+ */
+ fld_cache_insert(fld->lcf_cache, seq, mds);
+ } else {
+ CERROR("%s: Can't create FLD entry, rc %d\n",
+ fld->lcf_name, rc);
+ }
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_create);
-int
-fld_client_delete(struct lu_client_fld *fld,
- __u64 seq)
+int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
+ const struct lu_env *env)
{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
- __u32 rc;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_fld_target *target;
+ int rc;
+ ENTRY;
+
+ fld_cache_delete(fld->lcf_cache, seq);
+
+ target = fld_client_get_target(fld, seq);
+ LASSERT(target != NULL);
#ifdef __KERNEL__
- fld_cache_delete(fld_cache, seq);
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_delete(target->ft_srv,
+ env, seq);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+ }
#endif
-
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
-
- md_fld.mf_seq = seq;
- md_fld.mf_mds = 0;
- rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_delete);
-static int
-fld_client_get(struct lu_client_fld *fld,
- __u64 seq, mdsno_t *mds)
+int fld_client_lookup(struct lu_client_fld *fld,
+ seqno_t seq, mdsno_t *mds,
+ const struct lu_env *env)
{
- struct obd_export *fld_exp;
- struct md_fld md_fld;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_fld_target *target;
int rc;
ENTRY;
- fld_exp = fld_client_get_target(fld, seq);
- if (!fld_exp)
- RETURN(-EINVAL);
-
- md_fld.mf_seq = seq;
- rc = fld_client_rpc(fld_exp,
- &md_fld, FLD_LOOKUP);
+ /* lookup it in the cache */
+ rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
if (rc == 0)
- *mds = md_fld.mf_mds;
+ RETURN(0);
- RETURN(rc);
-}
+ /* can not find it in the cache */
+ target = fld_client_get_target(fld, seq);
+ LASSERT(target != NULL);
-/* lookup fid in the namespace of pfid according to the name */
-int
-fld_client_lookup(struct lu_client_fld *fld,
- __u64 seq, mdsno_t *mds)
-{
#ifdef __KERNEL__
- struct fld_cache_entry *flde;
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_lookup(target->ft_srv,
+ env, seq, &md_fld.mf_mds);
+ } else {
#endif
- int rc;
- ENTRY;
-
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_LOOKUP);
#ifdef __KERNEL__
- /* lookup it in the cache */
- flde = fld_cache_lookup(fld_cache, seq);
- if (flde != NULL) {
- *mds = flde->fce_mds;
- RETURN(0);
}
#endif
-
- /* can not find it in the cache */
- rc = fld_client_get(fld, seq, mds);
- if (rc)
- RETURN(rc);
+ if (rc == 0) {
+ *mds = md_fld.mf_mds;
-#ifdef __KERNEL__
- rc = fld_cache_insert(fld_cache, seq, *mds);
-#endif
-
+ /*
+ * Do not return error here as well. See previous comment in
+ * same situation in function fld_client_create(). --umka
+ */
+ fld_cache_insert(fld->lcf_cache, seq, *mds);
+ }
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_lookup);