Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / fld / fld_request.c
index b1b66b9..1933b27 100644 (file)
@@ -73,8 +73,10 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
                         RETURN(target);
         }
 
-        /* if target is not found, there is logical error anyway, so here is
-         * LBUG() to catch this situation. */
+        /*
+         * If target is not found, there is logical error anyway, so here is
+         * LBUG() to catch this situation.
+         */
         LBUG();
         RETURN(NULL);
 }
@@ -132,16 +134,16 @@ fld_client_get_target(struct lu_client_fld *fld,
 int fld_client_add_target(struct lu_client_fld *fld,
                           struct lu_fld_target *tar)
 {
-        const char *tar_name = fld_target_name(tar);
+        const char *name = fld_target_name(tar);
         struct lu_fld_target *target, *tmp;
         ENTRY;
 
         LASSERT(tar != NULL);
-        LASSERT(tar_name != NULL);
+        LASSERT(name != NULL);
         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
 
-        CDEBUG(D_INFO|D_WARNING, "%s: adding target %s\n",
-              fld->lcf_name, tar_name);
+        CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
+              fld->lcf_name, name);
 
         OBD_ALLOC_PTR(target);
         if (target == NULL)
@@ -149,24 +151,24 @@ int fld_client_add_target(struct lu_client_fld *fld,
 
         spin_lock(&fld->lcf_lock);
         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
-                const char *tmp_name = fld_target_name(tmp);
-                
-                if (strlen(tar_name) == strlen(tmp_name) &&
-                    strcmp(tmp_name, tar_name) == 0)
-                {
+                if (tmp->ft_idx == tar->ft_idx) {
                         spin_unlock(&fld->lcf_lock);
                         OBD_FREE_PTR(target);
+                        CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+                               name, fld_target_name(tmp), tmp->ft_idx);
                         RETURN(-EEXIST);
                 }
         }
 
         target->ft_exp = tar->ft_exp;
+        if (target->ft_exp != NULL)
+                class_export_get(target->ft_exp);
         target->ft_srv = tar->ft_srv;
         target->ft_idx = tar->ft_idx;
 
         list_add_tail(&target->ft_chain,
                       &fld->lcf_targets);
-        
+
         fld->lcf_count++;
         spin_unlock(&fld->lcf_lock);
 
@@ -174,7 +176,7 @@ int fld_client_add_target(struct lu_client_fld *fld,
 }
 EXPORT_SYMBOL(fld_client_add_target);
 
-/* remove export from FLD */
+/* Remove export from FLD */
 int fld_client_del_target(struct lu_client_fld *fld,
                           __u64 idx)
 {
@@ -188,7 +190,10 @@ int fld_client_del_target(struct lu_client_fld *fld,
                         fld->lcf_count--;
                         list_del(&target->ft_chain);
                         spin_unlock(&fld->lcf_lock);
-                        class_export_put(target->ft_exp);
+
+                        if (target->ft_exp != NULL)
+                                class_export_put(target->ft_exp);
+
                         OBD_FREE_PTR(target);
                         RETURN(0);
                 }
@@ -207,11 +212,12 @@ static int fld_client_proc_init(struct lu_client_fld *fld)
         ENTRY;
 
         fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
-                                             proc_lustre_root,
+                                             fld_type_proc_dir,
                                              NULL, NULL);
 
         if (IS_ERR(fld->lcf_proc_dir)) {
-                CERROR("LProcFS failed in fld-init\n");
+                CERROR("%s: LProcFS failed in fld-init\n",
+                       fld->lcf_name);
                 rc = PTR_ERR(fld->lcf_proc_dir);
                 RETURN(rc);
         }
@@ -219,8 +225,8 @@ static int fld_client_proc_init(struct lu_client_fld *fld)
         rc = lprocfs_add_vars(fld->lcf_proc_dir,
                               fld_client_proc_list, fld);
         if (rc) {
-                CERROR("can't init FLD "
-                       "proc, rc %d\n", rc);
+                CERROR("%s: Can't init FLD proc, rc %d\n",
+                       fld->lcf_name, rc);
                 GOTO(out_cleanup, rc);
         }
 
@@ -265,8 +271,7 @@ static inline int hash_is_sane(int hash)
 #define FLD_CACHE_THRESHOLD 10
 
 int fld_client_init(struct lu_client_fld *fld,
-                    const char *prefix, int hash,
-                    const struct lu_context *ctx)
+                    const char *prefix, int hash)
 {
 #ifdef __KERNEL__
         int cache_size, cache_threshold;
@@ -276,20 +281,20 @@ int fld_client_init(struct lu_client_fld *fld,
 
         LASSERT(fld != NULL);
 
+        snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+                 "cli-%s", prefix);
+
         if (!hash_is_sane(hash)) {
-                CERROR("Wrong hash function %#x\n", hash);
+                CERROR("%s: Wrong hash function %#x\n",
+                       fld->lcf_name, hash);
                 RETURN(-EINVAL);
         }
 
         fld->lcf_count = 0;
-        fld->lcf_ctx = ctx;
         spin_lock_init(&fld->lcf_lock);
         fld->lcf_hash = &fld_hash[hash];
         INIT_LIST_HEAD(&fld->lcf_targets);
 
-        snprintf(fld->lcf_name, sizeof(fld->lcf_name),
-                 "%s-cli-%s", LUSTRE_FLD_NAME, prefix);
-
 #ifdef __KERNEL__
         cache_size = FLD_CACHE_SIZE /
                 sizeof(struct fld_cache_entry);
@@ -316,8 +321,8 @@ out:
                 fld_client_fini(fld);
         else
                 CDEBUG(D_INFO|D_WARNING,
-                       "Client FLD, using \"%s\" hash\n",
-                       fld->lcf_hash->fh_name);
+                       "%s: Using \"%s\" hash\n",
+                       fld->lcf_name, fld->lcf_hash->fh_name);
         return rc;
 }
 EXPORT_SYMBOL(fld_client_init);
@@ -348,7 +353,6 @@ void fld_client_fini(struct lu_client_fld *fld)
         }
 #endif
 
-        CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
         EXIT;
 }
 EXPORT_SYMBOL(fld_client_fini);
@@ -369,13 +373,13 @@ static int fld_client_rpc(struct obd_export *exp,
         LASSERT(exp != NULL);
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION, FLD_QUERY,
-                              3, size, NULL);
+                              LUSTRE_MDS_VERSION,
+                              FLD_QUERY, 3, size,
+                              NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-
         req_capsule_set(&pill, &RQF_FLD_QUERY);
 
         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
@@ -404,7 +408,8 @@ out_req:
 }
 
 int fld_client_create(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t mds)
+                      seqno_t seq, mdsno_t mds,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
         struct lu_fld_target *target;
@@ -416,10 +421,9 @@ int fld_client_create(struct lu_client_fld *fld,
 
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
-                LASSERT(fld->lcf_ctx != NULL);
+                LASSERT(env != NULL);
                 rc = fld_server_create(target->ft_srv,
-                                       fld->lcf_ctx,
-                                       seq, mds);
+                                       env, seq, mds);
         } else {
 #endif
                 rc = fld_client_rpc(target->ft_exp,
@@ -436,13 +440,16 @@ int fld_client_create(struct lu_client_fld *fld,
                  * cache errors. --umka
                  */
                 fld_cache_insert(fld->lcf_cache, seq, mds);
+        } else {
+                CERROR("%s: Can't create FLD entry, rc %d\n",
+                       fld->lcf_name, rc);
         }
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_create);
 
-int fld_client_delete(struct lu_client_fld *fld,
-                      seqno_t seq)
+int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
         struct lu_fld_target *target;
@@ -456,10 +463,9 @@ int fld_client_delete(struct lu_client_fld *fld,
 
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
-                LASSERT(fld->lcf_ctx != NULL);
+                LASSERT(env != NULL);
                 rc = fld_server_delete(target->ft_srv,
-                                       fld->lcf_ctx,
-                                       seq);
+                                       env, seq);
         } else {
 #endif
                 rc = fld_client_rpc(target->ft_exp,
@@ -473,7 +479,8 @@ int fld_client_delete(struct lu_client_fld *fld,
 EXPORT_SYMBOL(fld_client_delete);
 
 int fld_client_lookup(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t *mds)
+                      seqno_t seq, mdsno_t *mds,
+                      const struct lu_env *env)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
         struct lu_fld_target *target;
@@ -491,10 +498,9 @@ int fld_client_lookup(struct lu_client_fld *fld,
 
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
-                LASSERT(fld->lcf_ctx != NULL);
+                LASSERT(env != NULL);
                 rc = fld_server_lookup(target->ft_srv,
-                                       fld->lcf_ctx,
-                                       seq, mds);
+                                       env, seq, &md_fld.mf_mds);
         } else {
 #endif
                 rc = fld_client_rpc(target->ft_exp,