Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / fld / fld_request.c
index edb7e35..1933b27 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/fld/fld_handler.c
+ *  lustre/fld/fld_request.c
  *  FLD (Fids Location Database)
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
-#ifdef __KERNEL__
-static __u32
-fld_cache_hash(__u64 seq)
+static int fld_rrb_hash(struct lu_client_fld *fld,
+                        seqno_t seq)
 {
-        return (__u32)seq;
+        LASSERT(fld->lcf_count > 0);
+        return do_div(seq, fld->lcf_count);
 }
 
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
-                 __u64 seq, __u64 mds)
+static struct lu_fld_target *
+fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
 {
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        int rc = 0;
+        struct lu_fld_target *target;
+        int hash;
         ENTRY;
 
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq)
-                        GOTO(exit_unlock, rc = -EEXIST);
-        }
-
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_mds = mds;
-        flde->fce_seq = seq;
-        
-        hlist_add_head(&flde->fce_list, bucket);
+        hash = fld_rrb_hash(fld, seq);
 
-        EXIT;
-exit_unlock:
-        spin_unlock(&fld_cache->fci_lock);
-        if (rc != 0)
-                OBD_FREE_PTR(flde);
-        return rc;
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        ENTRY;
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(flde, scan, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        hlist_del_init(&flde->fce_list);
-                        GOTO(out_unlock, 0);
-                }
+        list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == hash)
+                        RETURN(target);
         }
 
-        EXIT;
-out_unlock:
-        spin_unlock(&fld_cache->fci_lock);
-        return;
-}
-
-static struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        ENTRY;
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(flde, scan, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        spin_unlock(&fld_cache->fci_lock);
-                        RETURN(flde);
-                }
-        }
-        spin_unlock(&fld_cache->fci_lock);
+        /*
+         * If target is not found, there is logical error anyway, so here is
+         * LBUG() to catch this situation.
+         */
+        LBUG();
         RETURN(NULL);
 }
-#endif
 
-static int
-fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+static int fld_dht_hash(struct lu_client_fld *fld,
+                        seqno_t seq)
 {
-        if (fld->fld_count == 0)
-                return 0;
-        
-        return do_div(seq, fld->fld_count);
+        /* XXX: here should be DHT hash */
+        return fld_rrb_hash(fld, seq);
 }
 
-static int
-fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+static struct lu_fld_target *
+fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
 {
-        /* XXX: here should DHT hash */
-        return fld_rrb_hash(fld, seq);
+        /* XXX: here should be DHT scan code */
+        return fld_rrb_scan(fld, seq);
 }
 
 struct lu_fld_hash fld_hash[3] = {
         {
                 .fh_name = "DHT",
-                .fh_func = fld_dht_hash
+                .fh_hash_func = fld_dht_hash,
+                .fh_scan_func = fld_dht_scan
         },
         {
-                .fh_name = "Round Robin",
-                .fh_func = fld_rrb_hash
+                .fh_name = "RRB",
+                .fh_hash_func = fld_rrb_hash,
+                .fh_scan_func = fld_rrb_scan
         },
         {
                 0,
         }
 };
 
-static struct obd_export *
-fld_client_get_target(struct lu_client_fld *fld, __u64 seq)
+static struct lu_fld_target *
+fld_client_get_target(struct lu_client_fld *fld,
+                      seqno_t seq)
 {
-        struct obd_export *fld_exp;
-        int count = 0, hash;
+        struct lu_fld_target *target;
         ENTRY;
 
-        LASSERT(fld->fld_hash != NULL);
+        LASSERT(fld->lcf_hash != NULL);
 
-        spin_lock(&fld->fld_lock);
-        hash = fld->fld_hash->fh_func(fld, seq);
-        spin_unlock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
+        target = fld->lcf_hash->fh_scan_func(fld, seq);
+        spin_unlock(&fld->lcf_lock);
 
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp,
-                            &fld->fld_exports, exp_fld_chain) {
-                if (count == hash) {
-                        spin_unlock(&fld->fld_lock);
-                        RETURN(fld_exp);
-                }
-                count++;
-        }
-        spin_unlock(&fld->fld_lock);
-        RETURN(NULL);
+        RETURN(target);
 }
 
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
-int
-fld_client_add_target(struct lu_client_fld *fld,
-                      struct obd_export *exp)
+/*
+ * Add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module.
+ */
+int fld_client_add_target(struct lu_client_fld *fld,
+                          struct lu_fld_target *tar)
 {
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct obd_export *fld_exp;
+        const char *name = fld_target_name(tar);
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        LASSERT(exp != NULL);
+        LASSERT(tar != NULL);
+        LASSERT(name != NULL);
+        LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
+
+        CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
+              fld->lcf_name, name);
 
-        CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
-              cli->cl_target_uuid.uuid);
-        
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        spin_unlock(&fld->fld_lock);
+        OBD_ALLOC_PTR(target);
+        if (target == NULL)
+                RETURN(-ENOMEM);
+
+        spin_lock(&fld->lcf_lock);
+        list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+                if (tmp->ft_idx == tar->ft_idx) {
+                        spin_unlock(&fld->lcf_lock);
+                        OBD_FREE_PTR(target);
+                        CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+                               name, fld_target_name(tmp), tmp->ft_idx);
                         RETURN(-EEXIST);
                 }
         }
-        
-        fld_exp = class_export_get(exp);
-        list_add_tail(&fld_exp->exp_fld_chain,
-                      &fld->fld_exports);
-        fld->fld_count++;
-        
-        spin_unlock(&fld->fld_lock);
-        
+
+        target->ft_exp = tar->ft_exp;
+        if (target->ft_exp != NULL)
+                class_export_get(target->ft_exp);
+        target->ft_srv = tar->ft_srv;
+        target->ft_idx = tar->ft_idx;
+
+        list_add_tail(&target->ft_chain,
+                      &fld->lcf_targets);
+
+        fld->lcf_count++;
+        spin_unlock(&fld->lcf_lock);
+
         RETURN(0);
 }
 EXPORT_SYMBOL(fld_client_add_target);
 
-/* remove export from FLD */
-int
-fld_client_del_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+/* Remove export from FLD */
+int fld_client_del_target(struct lu_client_fld *fld,
+                          __u64 idx)
 {
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        fld->fld_count--;
-                        list_del(&fld_exp->exp_fld_chain);
-                        class_export_get(fld_exp);
+        spin_lock(&fld->lcf_lock);
+        list_for_each_entry_safe(target, tmp,
+                                 &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == idx) {
+                        fld->lcf_count--;
+                        list_del(&target->ft_chain);
+                        spin_unlock(&fld->lcf_lock);
 
-                        spin_unlock(&fld->fld_lock);
+                        if (target->ft_exp != NULL)
+                                class_export_put(target->ft_exp);
+
+                        OBD_FREE_PTR(target);
                         RETURN(0);
                 }
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
         RETURN(-ENOENT);
 }
 EXPORT_SYMBOL(fld_client_del_target);
 
+static void fld_client_proc_fini(struct lu_client_fld *fld);
+
 #ifdef LPROCFS
-static int
-fld_client_proc_init(struct lu_client_fld *fld)
+static int fld_client_proc_init(struct lu_client_fld *fld)
 {
         int rc;
         ENTRY;
 
-        fld->fld_proc_dir = lprocfs_register(fld->fld_name,
-                                             proc_lustre_root,
+        fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
+                                             fld_type_proc_dir,
                                              NULL, NULL);
-        
-        if (IS_ERR(fld->fld_proc_dir)) {
-                CERROR("LProcFS failed in fld-init\n");
-                rc = PTR_ERR(fld->fld_proc_dir);
-                GOTO(err, rc);
+
+        if (IS_ERR(fld->lcf_proc_dir)) {
+                CERROR("%s: LProcFS failed in fld-init\n",
+                       fld->lcf_name);
+                rc = PTR_ERR(fld->lcf_proc_dir);
+                RETURN(rc);
         }
 
-        rc = lprocfs_add_vars(fld->fld_proc_dir,
+        rc = lprocfs_add_vars(fld->lcf_proc_dir,
                               fld_client_proc_list, fld);
         if (rc) {
-                CERROR("can't init FLD "
-                       "proc, rc %d\n", rc);
-                GOTO(err, rc);
+                CERROR("%s: Can't init FLD proc, rc %d\n",
+                       fld->lcf_name, rc);
+                GOTO(out_cleanup, rc);
         }
 
         RETURN(0);
 
-err:
-        fld->fld_proc_dir = NULL;
+out_cleanup:
+        fld_client_proc_fini(fld);
         return rc;
 }
 
-static void
-fld_client_proc_fini(struct lu_client_fld *fld)
+static void fld_client_proc_fini(struct lu_client_fld *fld)
 {
         ENTRY;
-        if (fld->fld_proc_dir) {
-                lprocfs_remove(fld->fld_proc_dir);
-                fld->fld_proc_dir = NULL;
+        if (fld->lcf_proc_dir) {
+                if (!IS_ERR(fld->lcf_proc_dir))
+                        lprocfs_remove(fld->lcf_proc_dir);
+                fld->lcf_proc_dir = NULL;
         }
         EXIT;
 }
+#else
+static int fld_client_proc_init(struct lu_client_fld *fld)
+{
+        return 0;
+}
+
+static void fld_client_proc_fini(struct lu_client_fld *fld)
+{
+        return;
+}
 #endif
 
-int
-fld_client_init(struct lu_client_fld *fld,
-                const char *uuid, int hash)
+static inline int hash_is_sane(int hash)
 {
-        int rc = 0;
+        return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
+}
+
+/* 1M of FLD cache will not hurt client a lot */
+#define FLD_CACHE_SIZE 1024000
+
+/* cache threshold is 10 percent of size */
+#define FLD_CACHE_THRESHOLD 10
+
+int fld_client_init(struct lu_client_fld *fld,
+                    const char *prefix, int hash)
+{
+#ifdef __KERNEL__
+        int cache_size, cache_threshold;
+#endif
+        int rc;
         ENTRY;
 
         LASSERT(fld != NULL);
 
-        if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
-                CERROR("wrong hash function 0x%x\n", hash);
+        snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+                 "cli-%s", prefix);
+
+        if (!hash_is_sane(hash)) {
+                CERROR("%s: Wrong hash function %#x\n",
+                       fld->lcf_name, hash);
                 RETURN(-EINVAL);
         }
-        
-        INIT_LIST_HEAD(&fld->fld_exports);
-        spin_lock_init(&fld->fld_lock);
-        fld->fld_hash = &fld_hash[hash];
-        fld->fld_count = 0;
-        
-        snprintf(fld->fld_name, sizeof(fld->fld_name),
-                 "%s-%s", LUSTRE_FLD_NAME, uuid);
-        
-#ifdef LPROCFS
+
+        fld->lcf_count = 0;
+        spin_lock_init(&fld->lcf_lock);
+        fld->lcf_hash = &fld_hash[hash];
+        INIT_LIST_HEAD(&fld->lcf_targets);
+
+#ifdef __KERNEL__
+        cache_size = FLD_CACHE_SIZE /
+                sizeof(struct fld_cache_entry);
+
+        cache_threshold = cache_size *
+                FLD_CACHE_THRESHOLD / 100;
+
+        fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
+                                        cache_size,
+                                        cache_threshold);
+        if (IS_ERR(fld->lcf_cache)) {
+                rc = PTR_ERR(fld->lcf_cache);
+                fld->lcf_cache = NULL;
+                GOTO(out, rc);
+        }
+#endif
+
         rc = fld_client_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
-#endif
         EXIT;
 out:
         if (rc)
                 fld_client_fini(fld);
-        else 
+        else
                 CDEBUG(D_INFO|D_WARNING,
-                       "Client FLD, using \"%s\" hash\n",
-                       fld->fld_hash->fh_name);
+                       "%s: Using \"%s\" hash\n",
+                       fld->lcf_name, fld->lcf_hash->fh_name);
         return rc;
 }
 EXPORT_SYMBOL(fld_client_init);
 
-void
-fld_client_fini(struct lu_client_fld *fld)
+void fld_client_fini(struct lu_client_fld *fld)
 {
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-#ifdef LPROCFS
         fld_client_proc_fini(fld);
-#endif
-        
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp,
-                                 &fld->fld_exports, exp_fld_chain) {
-                fld->fld_count--;
-                list_del(&fld_exp->exp_fld_chain);
-                class_export_get(fld_exp);
+
+        spin_lock(&fld->lcf_lock);
+        list_for_each_entry_safe(target, tmp,
+                                 &fld->lcf_targets, ft_chain) {
+                fld->lcf_count--;
+                list_del(&target->ft_chain);
+                if (target->ft_exp != NULL)
+                        class_export_put(target->ft_exp);
+                OBD_FREE_PTR(target);
+        }
+        spin_unlock(&fld->lcf_lock);
+
+#ifdef __KERNEL__
+        if (fld->lcf_cache != NULL) {
+                if (!IS_ERR(fld->lcf_cache))
+                        fld_cache_fini(fld->lcf_cache);
+                fld->lcf_cache = NULL;
         }
-        spin_unlock(&fld->fld_lock);
-        CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
+#endif
+
         EXIT;
 }
 EXPORT_SYMBOL(fld_client_fini);
 
-static int
-fld_client_rpc(struct obd_export *exp,
-               struct md_fld *mf, __u32 fld_op)
+static int fld_client_rpc(struct obd_export *exp,
+                          struct md_fld *mf, __u32 fld_op)
 {
-        int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
-        int mf_size = sizeof(struct md_fld);
+        int size[3] = { sizeof(struct ptlrpc_body),
+                        sizeof(__u32),
+                        sizeof(struct md_fld) };
         struct ptlrpc_request *req;
+        struct req_capsule pill;
         struct md_fld *pmf;
         __u32 *op;
+        int rc;
         ENTRY;
 
         LASSERT(exp != NULL);
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION, FLD_QUERY,
-                              2, size, NULL);
+                              LUSTRE_MDS_VERSION,
+                              FLD_QUERY, 3, size,
+                              NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+        req_capsule_init(&pill, req, RCL_CLIENT, NULL);
+        req_capsule_set(&pill, &RQF_FLD_QUERY);
+
+        op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
         *op = fld_op;
 
-        pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
-        memcpy(pmf, mf, sizeof(*mf));
+        pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
+        *pmf = *mf;
 
-        req->rq_replen = lustre_msg_size(1, &mf_size);
+        size[1] = sizeof(struct md_fld);
+        ptlrpc_req_set_repsize(req, 2, size);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
 
         rc = ptlrpc_queue_wait(req);
         if (rc)
                 GOTO(out_req, rc);
 
-        pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
-                                 lustre_swab_md_fld);
-        *mf = *pmf; 
+        pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
+        if (pmf == NULL)
+                GOTO(out_req, rc = -EFAULT);
+        *mf = *pmf;
+        EXIT;
 out_req:
+        req_capsule_fini(&pill);
         ptlrpc_req_finished(req);
-        RETURN(rc);
+        return rc;
 }
 
-int
-fld_client_create(struct lu_client_fld *fld,
-                  __u64 seq, mdsno_t mds)
+int fld_client_create(struct lu_client_fld *fld,
+                      seqno_t seq, mdsno_t mds,
+                      const struct lu_env *env)
 {
-        struct obd_export *fld_exp;
-        struct md_fld      md_fld;
-        __u32 rc;
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
+        struct lu_fld_target *target;
+        int rc;
         ENTRY;
 
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds;
-        
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
-        
+        target = fld_client_get_target(fld, seq);
+        LASSERT(target != NULL);
+
 #ifdef __KERNEL__
-        if (rc  == 0)
-                rc = fld_cache_insert(fld_cache, seq, mds);
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_create(target->ft_srv,
+                                       env, seq, mds);
+        } else {
 #endif
-        
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        }
+#endif
+
+        if (rc == 0) {
+                /*
+                 * Do not return result of calling fld_cache_insert()
+                 * here. First of all because it may return -EEXISTS. Another
+                 * reason is that, we do not want to stop proceeding because of
+                 * cache errors. --umka
+                 */
+                fld_cache_insert(fld->lcf_cache, seq, mds);
+        } else {
+                CERROR("%s: Can't create FLD entry, rc %d\n",
+                       fld->lcf_name, rc);
+        }
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_create);
 
-int
-fld_client_delete(struct lu_client_fld *fld,
-                  __u64 seq)
+int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
+                      const struct lu_env *env)
 {
-        struct obd_export *fld_exp;
-        struct md_fld      md_fld;
-        __u32 rc;
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+        struct lu_fld_target *target;
+        int rc;
+        ENTRY;
+
+        fld_cache_delete(fld->lcf_cache, seq);
+
+        target = fld_client_get_target(fld, seq);
+        LASSERT(target != NULL);
 
 #ifdef __KERNEL__
-        fld_cache_delete(fld_cache, seq);
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_delete(target->ft_srv,
+                                       env, seq);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+        }
 #endif
-        
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = 0;
 
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_delete);
 
-static int
-fld_client_get(struct lu_client_fld *fld,
-               __u64 seq, mdsno_t *mds)
+int fld_client_lookup(struct lu_client_fld *fld,
+                      seqno_t seq, mdsno_t *mds,
+                      const struct lu_env *env)
 {
-        struct obd_export *fld_exp;
-        struct md_fld md_fld;
+        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-                
-        md_fld.mf_seq = seq;
-        rc = fld_client_rpc(fld_exp,
-                            &md_fld, FLD_LOOKUP);
+        /* lookup it in the cache */
+        rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
         if (rc == 0)
-                *mds = md_fld.mf_mds;
+                RETURN(0);
 
-        RETURN(rc);
-}
+        /* can not find it in the cache */
+        target = fld_client_get_target(fld, seq);
+        LASSERT(target != NULL);
 
-/* lookup fid in the namespace of pfid according to the name */
-int
-fld_client_lookup(struct lu_client_fld *fld,
-                  __u64 seq, mdsno_t *mds)
-{
 #ifdef __KERNEL__
-        struct fld_cache_entry *flde;
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_lookup(target->ft_srv,
+                                       env, seq, &md_fld.mf_mds);
+        } else {
 #endif
-        int rc;
-        ENTRY;
-
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_LOOKUP);
 #ifdef __KERNEL__
-        /* lookup it in the cache */
-        flde = fld_cache_lookup(fld_cache, seq);
-        if (flde != NULL) {
-                *mds = flde->fce_mds;
-                RETURN(0);
         }
 #endif
-        
-        /* can not find it in the cache */
-        rc = fld_client_get(fld, seq, mds);
-        if (rc)
-                RETURN(rc);
+        if (rc == 0) {
+                *mds = md_fld.mf_mds;
 
-#ifdef __KERNEL__
-        rc = fld_cache_insert(fld_cache, seq, *mds);
-#endif
-        
+                /*
+                 * Do not return error here as well. See previous comment in
+                 * same situation in function fld_client_create(). --umka
+                 */
+                fld_cache_insert(fld->lcf_cache, seq, *mds);
+        }
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_lookup);