Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / fld / fld_request.c
index 7389e4a..1933b27 100644 (file)
 static int fld_rrb_hash(struct lu_client_fld *fld,
                         seqno_t seq)
 {
-        LASSERT(fld->fld_count > 0);
-        return do_div(seq, fld->fld_count);
+        LASSERT(fld->lcf_count > 0);
+        return do_div(seq, fld->lcf_count);
 }
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
 {
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int hash;
         ENTRY;
 
         hash = fld_rrb_hash(fld, seq);
 
-        list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
-                if (target->fldt_idx == hash)
+        list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == hash)
                         RETURN(target);
         }
 
-        /* if target is not found, there is logical error anyway, so here is
-         * LBUG() to catch this situation. */
+        /*
+         * If target is not found, there is logical error anyway, so here is
+         * LBUG() to catch this situation.
+         */
         LBUG();
         RETURN(NULL);
 }
@@ -86,11 +88,11 @@ static int fld_dht_hash(struct lu_client_fld *fld,
         return fld_rrb_hash(fld, seq);
 }
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
 {
         /* XXX: here should be DHT scan code */
-        return fld_dht_scan(fld, seq);
+        return fld_rrb_scan(fld, seq);
 }
 
 struct lu_fld_hash fld_hash[3] = {
@@ -109,19 +111,19 @@ struct lu_fld_hash fld_hash[3] = {
         }
 };
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_client_get_target(struct lu_client_fld *fld,
                       seqno_t seq)
 {
-        struct fld_target *target;
+        struct lu_fld_target *target;
         ENTRY;
 
-        LASSERT(fld->fld_hash != NULL);
+        LASSERT(fld->lcf_hash != NULL);
+
+        spin_lock(&fld->lcf_lock);
+        target = fld->lcf_hash->fh_scan_func(fld, seq);
+        spin_unlock(&fld->lcf_lock);
 
-        spin_lock(&fld->fld_lock);
-        target = fld->fld_hash->fh_scan_func(fld, seq);
-        spin_unlock(&fld->fld_lock);
-        
         RETURN(target);
 }
 
@@ -130,66 +132,73 @@ fld_client_get_target(struct lu_client_fld *fld,
  * of FLD module.
  */
 int fld_client_add_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+                          struct lu_fld_target *tar)
 {
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct fld_target *target, *tmp;
+        const char *name = fld_target_name(tar);
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        LASSERT(exp != NULL);
+        LASSERT(tar != NULL);
+        LASSERT(name != NULL);
+        LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
 
-        CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
-              fld->fld_name, cli->cl_target_uuid.uuid);
+        CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
+              fld->lcf_name, name);
 
         OBD_ALLOC_PTR(target);
         if (target == NULL)
                 RETURN(-ENOMEM);
 
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
-                if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        spin_unlock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
+        list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+                if (tmp->ft_idx == tar->ft_idx) {
+                        spin_unlock(&fld->lcf_lock);
                         OBD_FREE_PTR(target);
+                        CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+                               name, fld_target_name(tmp), tmp->ft_idx);
                         RETURN(-EEXIST);
                 }
         }
 
-        target->fldt_exp = class_export_get(exp);
-        target->fldt_idx = fld->fld_count;
+        target->ft_exp = tar->ft_exp;
+        if (target->ft_exp != NULL)
+                class_export_get(target->ft_exp);
+        target->ft_srv = tar->ft_srv;
+        target->ft_idx = tar->ft_idx;
 
-        list_add_tail(&target->fldt_chain,
-                      &fld->fld_targets);
-        fld->fld_count++;
-        spin_unlock(&fld->fld_lock);
+        list_add_tail(&target->ft_chain,
+                      &fld->lcf_targets);
+
+        fld->lcf_count++;
+        spin_unlock(&fld->lcf_lock);
 
         RETURN(0);
 }
 EXPORT_SYMBOL(fld_client_add_target);
 
-/* remove export from FLD */
+/* Remove export from FLD */
 int fld_client_del_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+                          __u64 idx)
 {
-        struct fld_target *target, *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         list_for_each_entry_safe(target, tmp,
-                                 &fld->fld_targets, fldt_chain) {
-                if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        fld->fld_count--;
-                        list_del(&target->fldt_chain);
-                        spin_unlock(&fld->fld_lock);
-                        class_export_put(target->fldt_exp);
+                                 &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == idx) {
+                        fld->lcf_count--;
+                        list_del(&target->ft_chain);
+                        spin_unlock(&fld->lcf_lock);
+
+                        if (target->ft_exp != NULL)
+                                class_export_put(target->ft_exp);
+
                         OBD_FREE_PTR(target);
                         RETURN(0);
                 }
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
         RETURN(-ENOENT);
 }
 EXPORT_SYMBOL(fld_client_del_target);
@@ -202,21 +211,22 @@ static int fld_client_proc_init(struct lu_client_fld *fld)
         int rc;
         ENTRY;
 
-        fld->fld_proc_dir = lprocfs_register(fld->fld_name,
-                                             proc_lustre_root,
+        fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
+                                             fld_type_proc_dir,
                                              NULL, NULL);
 
-        if (IS_ERR(fld->fld_proc_dir)) {
-                CERROR("LProcFS failed in fld-init\n");
-                rc = PTR_ERR(fld->fld_proc_dir);
+        if (IS_ERR(fld->lcf_proc_dir)) {
+                CERROR("%s: LProcFS failed in fld-init\n",
+                       fld->lcf_name);
+                rc = PTR_ERR(fld->lcf_proc_dir);
                 RETURN(rc);
         }
 
-        rc = lprocfs_add_vars(fld->fld_proc_dir,
+        rc = lprocfs_add_vars(fld->lcf_proc_dir,
                               fld_client_proc_list, fld);
         if (rc) {
-                CERROR("can't init FLD "
-                       "proc, rc %d\n", rc);
+                CERROR("%s: Can't init FLD proc, rc %d\n",
+                       fld->lcf_name, rc);
                 GOTO(out_cleanup, rc);
         }
 
@@ -230,10 +240,10 @@ out_cleanup:
 static void fld_client_proc_fini(struct lu_client_fld *fld)
 {
         ENTRY;
-        if (fld->fld_proc_dir) {
-                if (!IS_ERR(fld->fld_proc_dir))
-                        lprocfs_remove(fld->fld_proc_dir);
-                fld->fld_proc_dir = NULL;
+        if (fld->lcf_proc_dir) {
+                if (!IS_ERR(fld->lcf_proc_dir))
+                        lprocfs_remove(fld->lcf_proc_dir);
+                fld->lcf_proc_dir = NULL;
         }
         EXIT;
 }
@@ -254,32 +264,50 @@ static inline int hash_is_sane(int hash)
         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
 }
 
+/* 1M of FLD cache will not hurt client a lot */
+#define FLD_CACHE_SIZE 1024000
+
+/* cache threshold is 10 percent of size */
+#define FLD_CACHE_THRESHOLD 10
+
 int fld_client_init(struct lu_client_fld *fld,
-                    const char *uuid, int hash)
+                    const char *prefix, int hash)
 {
-        int rc = 0;
+#ifdef __KERNEL__
+        int cache_size, cache_threshold;
+#endif
+        int rc;
         ENTRY;
 
         LASSERT(fld != NULL);
 
+        snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+                 "cli-%s", prefix);
+
         if (!hash_is_sane(hash)) {
-                CERROR("wrong hash function %#x\n", hash);
+                CERROR("%s: Wrong hash function %#x\n",
+                       fld->lcf_name, hash);
                 RETURN(-EINVAL);
         }
 
-        INIT_LIST_HEAD(&fld->fld_targets);
-        spin_lock_init(&fld->fld_lock);
-        fld->fld_hash = &fld_hash[hash];
-        fld->fld_count = 0;
-
-        snprintf(fld->fld_name, sizeof(fld->fld_name),
-                 "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
+        fld->lcf_count = 0;
+        spin_lock_init(&fld->lcf_lock);
+        fld->lcf_hash = &fld_hash[hash];
+        INIT_LIST_HEAD(&fld->lcf_targets);
 
 #ifdef __KERNEL__
-        fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
-        if (IS_ERR(fld->fld_cache)) {
-                rc = PTR_ERR(fld->fld_cache);
-                fld->fld_cache = NULL;
+        cache_size = FLD_CACHE_SIZE /
+                sizeof(struct fld_cache_entry);
+
+        cache_threshold = cache_size *
+                FLD_CACHE_THRESHOLD / 100;
+
+        fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
+                                        cache_size,
+                                        cache_threshold);
+        if (IS_ERR(fld->lcf_cache)) {
+                rc = PTR_ERR(fld->lcf_cache);
+                fld->lcf_cache = NULL;
                 GOTO(out, rc);
         }
 #endif
@@ -293,37 +321,38 @@ out:
                 fld_client_fini(fld);
         else
                 CDEBUG(D_INFO|D_WARNING,
-                       "Client FLD, using \"%s\" hash\n",
-                       fld->fld_hash->fh_name);
+                       "%s: Using \"%s\" hash\n",
+                       fld->lcf_name, fld->lcf_hash->fh_name);
         return rc;
 }
 EXPORT_SYMBOL(fld_client_init);
 
 void fld_client_fini(struct lu_client_fld *fld)
 {
-        struct fld_target *target, *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
         fld_client_proc_fini(fld);
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         list_for_each_entry_safe(target, tmp,
-                                 &fld->fld_targets, fldt_chain) {
-                fld->fld_count--;
-                list_del(&target->fldt_chain);
-                class_export_put(target->fldt_exp);
+                                 &fld->lcf_targets, ft_chain) {
+                fld->lcf_count--;
+                list_del(&target->ft_chain);
+                if (target->ft_exp != NULL)
+                        class_export_put(target->ft_exp);
                 OBD_FREE_PTR(target);
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
 
 #ifdef __KERNEL__
-        if (fld->fld_cache != NULL) {
-                fld_cache_fini(fld->fld_cache);
-                fld->fld_cache = NULL;
+        if (fld->lcf_cache != NULL) {
+                if (!IS_ERR(fld->lcf_cache))
+                        fld_cache_fini(fld->lcf_cache);
+                fld->lcf_cache = NULL;
         }
 #endif
 
-        CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
         EXIT;
 }
 EXPORT_SYMBOL(fld_client_fini);
@@ -331,24 +360,26 @@ EXPORT_SYMBOL(fld_client_fini);
 static int fld_client_rpc(struct obd_export *exp,
                           struct md_fld *mf, __u32 fld_op)
 {
-        int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
-        int mf_size = sizeof(struct md_fld);
+        int size[3] = { sizeof(struct ptlrpc_body),
+                        sizeof(__u32),
+                        sizeof(struct md_fld) };
         struct ptlrpc_request *req;
         struct req_capsule pill;
         struct md_fld *pmf;
         __u32 *op;
+        int rc;
         ENTRY;
 
         LASSERT(exp != NULL);
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION, FLD_QUERY,
-                              2, size, NULL);
+                              LUSTRE_MDS_VERSION,
+                              FLD_QUERY, 3, size,
+                              NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-
         req_capsule_set(&pill, &RQF_FLD_QUERY);
 
         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
@@ -357,7 +388,8 @@ static int fld_client_rpc(struct obd_export *exp,
         pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
         *pmf = *mf;
 
-        req->rq_replen = lustre_msg_size(1, &mf_size);
+        size[1] = sizeof(struct md_fld);
+        ptlrpc_req_set_repsize(req, 2, size);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
 
         rc = ptlrpc_queue_wait(req);
@@ -376,17 +408,29 @@ out_req:
 }
 
 int fld_client_create(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t mds)
+                      seqno_t seq, mdsno_t mds,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_create(target->ft_srv,
+                                       env, seq, mds);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        }
+#endif
 
         if (rc == 0) {
                 /*
@@ -395,42 +439,56 @@ int fld_client_create(struct lu_client_fld *fld,
                  * reason is that, we do not want to stop proceeding because of
                  * cache errors. --umka
                  */
-                fld_cache_insert(fld->fld_cache, seq, mds);
+                fld_cache_insert(fld->lcf_cache, seq, mds);
+        } else {
+                CERROR("%s: Can't create FLD entry, rc %d\n",
+                       fld->lcf_name, rc);
         }
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_create);
 
-int fld_client_delete(struct lu_client_fld *fld,
-                      seqno_t seq)
+int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
-        fld_cache_delete(fld->fld_cache, seq);
+        fld_cache_delete(fld->lcf_cache, seq);
 
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_DELETE);
-        
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_delete(target->ft_srv,
+                                       env, seq);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+        }
+#endif
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_delete);
 
 int fld_client_lookup(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t *mds)
+                      seqno_t seq, mdsno_t *mds,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         /* lookup it in the cache */
-        rc = fld_cache_lookup(fld->fld_cache, seq, mds);
+        rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
         if (rc == 0)
                 RETURN(0);
 
@@ -438,8 +496,18 @@ int fld_client_lookup(struct lu_client_fld *fld,
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(env != NULL);
+                rc = fld_server_lookup(target->ft_srv,
+                                       env, seq, &md_fld.mf_mds);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+        }
+#endif
         if (rc == 0) {
                 *mds = md_fld.mf_mds;
 
@@ -447,7 +515,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
                  * Do not return error here as well. See previous comment in
                  * same situation in function fld_client_create(). --umka
                  */
-                fld_cache_insert(fld->fld_cache, seq, *mds);
+                fld_cache_insert(fld->lcf_cache, seq, *mds);
         }
         RETURN(rc);
 }