static int fld_rrb_hash(struct lu_client_fld *fld,
seqno_t seq)
{
- if (fld->fld_count == 0)
- return 0;
+ LASSERT(fld->lcf_count > 0);
+ return do_div(seq, fld->lcf_count);
+}
+
+static struct lu_fld_target *
+fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
+{
+ struct lu_fld_target *target;
+ int hash;
+ ENTRY;
+
+ hash = fld_rrb_hash(fld, seq);
- return do_div(seq, fld->fld_count);
+ list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == hash)
+ RETURN(target);
+ }
+
+ /*
+ * If target is not found, there is logical error anyway, so here is
+ * LBUG() to catch this situation.
+ */
+ LBUG();
+ RETURN(NULL);
}
static int fld_dht_hash(struct lu_client_fld *fld,
return fld_rrb_hash(fld, seq);
}
+static struct lu_fld_target *
+fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
+{
+ /* XXX: here should be DHT scan code */
+ return fld_rrb_scan(fld, seq);
+}
+
struct lu_fld_hash fld_hash[3] = {
{
.fh_name = "DHT",
- .fh_func = fld_dht_hash
+ .fh_hash_func = fld_dht_hash,
+ .fh_scan_func = fld_dht_scan
},
{
- .fh_name = "Round Robin",
- .fh_func = fld_rrb_hash
+ .fh_name = "RRB",
+ .fh_hash_func = fld_rrb_hash,
+ .fh_scan_func = fld_rrb_scan
},
{
0,
}
};
-/* this function makes decision if passed @target appropriate acoordingly to
- * passed @hash. In case of usual round-robin hash, this is decided by comparing
- * hash and target's index. In the case of DHT, algorithm is a bit more
- * complicated. */
-static int fld_client_apt_target(struct fld_target *target,
- int hash)
-{
- /* XXX: DHT case should be worked out. */
- return (target->fldt_idx == hash);
-}
-
-static struct fld_target *
+static struct lu_fld_target *
fld_client_get_target(struct lu_client_fld *fld,
seqno_t seq)
{
- struct fld_target *target;
- int hash;
+ struct lu_fld_target *target;
ENTRY;
- LASSERT(fld->fld_hash != NULL);
+ LASSERT(fld->lcf_hash != NULL);
- spin_lock(&fld->fld_lock);
- hash = fld->fld_hash->fh_func(fld, seq);
+ spin_lock(&fld->lcf_lock);
+ target = fld->lcf_hash->fh_scan_func(fld, seq);
+ spin_unlock(&fld->lcf_lock);
- list_for_each_entry(target,
- &fld->fld_targets, fldt_chain) {
- if (fld_client_apt_target(target, hash)) {
- spin_unlock(&fld->fld_lock);
- RETURN(target);
- }
- }
- spin_unlock(&fld->fld_lock);
-
- /* if target is not found, there is logical error anyway, so here is
- * LBUG() to catch that situation. */
- LBUG();
- RETURN(NULL);
+ RETURN(target);
}
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
+/*
+ * Add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module.
+ */
int fld_client_add_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+ struct lu_fld_target *tar)
{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct fld_target *target, *tmp;
+ const char *name = fld_target_name(tar);
+ struct lu_fld_target *target, *tmp;
ENTRY;
- LASSERT(exp != NULL);
+ LASSERT(tar != NULL);
+ LASSERT(name != NULL);
+ LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
- CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
- fld->fld_name, cli->cl_target_uuid.uuid);
+ CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
+ fld->lcf_name, name);
OBD_ALLOC_PTR(target);
if (target == NULL)
RETURN(-ENOMEM);
- spin_lock(&fld->fld_lock);
- list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
- if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- spin_unlock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
+ list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+ if (tmp->ft_idx == tar->ft_idx) {
+ spin_unlock(&fld->lcf_lock);
OBD_FREE_PTR(target);
+ CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+ name, fld_target_name(tmp), tmp->ft_idx);
RETURN(-EEXIST);
}
}
- target->fldt_exp = class_export_get(exp);
- target->fldt_idx = fld->fld_count;
+ target->ft_exp = tar->ft_exp;
+ if (target->ft_exp != NULL)
+ class_export_get(target->ft_exp);
+ target->ft_srv = tar->ft_srv;
+ target->ft_idx = tar->ft_idx;
- list_add_tail(&target->fldt_chain,
- &fld->fld_targets);
- fld->fld_count++;
- spin_unlock(&fld->fld_lock);
+ list_add_tail(&target->ft_chain,
+ &fld->lcf_targets);
+
+ fld->lcf_count++;
+ spin_unlock(&fld->lcf_lock);
RETURN(0);
}
EXPORT_SYMBOL(fld_client_add_target);
-/* remove export from FLD */
+/* Remove export from FLD */
int fld_client_del_target(struct lu_client_fld *fld,
- struct obd_export *exp)
+ __u64 idx)
{
- struct fld_target *target, *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
list_for_each_entry_safe(target, tmp,
- &fld->fld_targets, fldt_chain) {
- if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
- &exp->exp_client_uuid))
- {
- fld->fld_count--;
- list_del(&target->fldt_chain);
- spin_unlock(&fld->fld_lock);
- class_export_put(target->fldt_exp);
+ &fld->lcf_targets, ft_chain) {
+ if (target->ft_idx == idx) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ spin_unlock(&fld->lcf_lock);
+
+ if (target->ft_exp != NULL)
+ class_export_put(target->ft_exp);
+
OBD_FREE_PTR(target);
RETURN(0);
}
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
RETURN(-ENOENT);
}
EXPORT_SYMBOL(fld_client_del_target);
int rc;
ENTRY;
- fld->fld_proc_dir = lprocfs_register(fld->fld_name,
- proc_lustre_root,
+ fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
+ fld_type_proc_dir,
NULL, NULL);
- if (IS_ERR(fld->fld_proc_dir)) {
- CERROR("LProcFS failed in fld-init\n");
- rc = PTR_ERR(fld->fld_proc_dir);
+ if (IS_ERR(fld->lcf_proc_dir)) {
+ CERROR("%s: LProcFS failed in fld-init\n",
+ fld->lcf_name);
+ rc = PTR_ERR(fld->lcf_proc_dir);
RETURN(rc);
}
- rc = lprocfs_add_vars(fld->fld_proc_dir,
+ rc = lprocfs_add_vars(fld->lcf_proc_dir,
fld_client_proc_list, fld);
if (rc) {
- CERROR("can't init FLD "
- "proc, rc %d\n", rc);
+ CERROR("%s: Can't init FLD proc, rc %d\n",
+ fld->lcf_name, rc);
GOTO(out_cleanup, rc);
}
static void fld_client_proc_fini(struct lu_client_fld *fld)
{
ENTRY;
- if (fld->fld_proc_dir) {
- if (!IS_ERR(fld->fld_proc_dir))
- lprocfs_remove(fld->fld_proc_dir);
- fld->fld_proc_dir = NULL;
+ if (fld->lcf_proc_dir) {
+ if (!IS_ERR(fld->lcf_proc_dir))
+ lprocfs_remove(fld->lcf_proc_dir);
+ fld->lcf_proc_dir = NULL;
}
EXIT;
}
return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
}
+/* 1M of FLD cache will not hurt client a lot */
+#define FLD_CACHE_SIZE 1024000
+
+/* cache threshold is 10 percent of size */
+#define FLD_CACHE_THRESHOLD 10
+
int fld_client_init(struct lu_client_fld *fld,
- const char *uuid, int hash)
+ const char *prefix, int hash)
{
- int rc = 0;
+#ifdef __KERNEL__
+ int cache_size, cache_threshold;
+#endif
+ int rc;
ENTRY;
LASSERT(fld != NULL);
+ snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+ "cli-%s", prefix);
+
if (!hash_is_sane(hash)) {
- CERROR("wrong hash function 0x%x\n", hash);
+ CERROR("%s: Wrong hash function %#x\n",
+ fld->lcf_name, hash);
RETURN(-EINVAL);
}
- INIT_LIST_HEAD(&fld->fld_targets);
- spin_lock_init(&fld->fld_lock);
- fld->fld_hash = &fld_hash[hash];
- fld->fld_count = 0;
-
- snprintf(fld->fld_name, sizeof(fld->fld_name),
- "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
+ fld->lcf_count = 0;
+ spin_lock_init(&fld->lcf_lock);
+ fld->lcf_hash = &fld_hash[hash];
+ INIT_LIST_HEAD(&fld->lcf_targets);
#ifdef __KERNEL__
- fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
- if (IS_ERR(fld->fld_cache)) {
- rc = PTR_ERR(fld->fld_cache);
- fld->fld_cache = NULL;
+ cache_size = FLD_CACHE_SIZE /
+ sizeof(struct fld_cache_entry);
+
+ cache_threshold = cache_size *
+ FLD_CACHE_THRESHOLD / 100;
+
+ fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
+ cache_size,
+ cache_threshold);
+ if (IS_ERR(fld->lcf_cache)) {
+ rc = PTR_ERR(fld->lcf_cache);
+ fld->lcf_cache = NULL;
GOTO(out, rc);
}
#endif
fld_client_fini(fld);
else
CDEBUG(D_INFO|D_WARNING,
- "Client FLD, using \"%s\" hash\n",
- fld->fld_hash->fh_name);
+ "%s: Using \"%s\" hash\n",
+ fld->lcf_name, fld->lcf_hash->fh_name);
return rc;
}
EXPORT_SYMBOL(fld_client_init);
void fld_client_fini(struct lu_client_fld *fld)
{
- struct fld_target *target, *tmp;
+ struct lu_fld_target *target, *tmp;
ENTRY;
fld_client_proc_fini(fld);
- spin_lock(&fld->fld_lock);
+ spin_lock(&fld->lcf_lock);
list_for_each_entry_safe(target, tmp,
- &fld->fld_targets, fldt_chain) {
- fld->fld_count--;
- list_del(&target->fldt_chain);
- class_export_put(target->fldt_exp);
+ &fld->lcf_targets, ft_chain) {
+ fld->lcf_count--;
+ list_del(&target->ft_chain);
+ if (target->ft_exp != NULL)
+ class_export_put(target->ft_exp);
OBD_FREE_PTR(target);
}
- spin_unlock(&fld->fld_lock);
+ spin_unlock(&fld->lcf_lock);
#ifdef __KERNEL__
- if (fld->fld_cache != NULL) {
- fld_cache_fini(fld->fld_cache);
- fld->fld_cache = NULL;
+ if (fld->lcf_cache != NULL) {
+ if (!IS_ERR(fld->lcf_cache))
+ fld_cache_fini(fld->lcf_cache);
+ fld->lcf_cache = NULL;
}
#endif
- CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
EXIT;
}
EXPORT_SYMBOL(fld_client_fini);
static int fld_client_rpc(struct obd_export *exp,
struct md_fld *mf, __u32 fld_op)
{
- int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
- int mf_size = sizeof(struct md_fld);
+ int size[3] = { sizeof(struct ptlrpc_body),
+ sizeof(__u32),
+ sizeof(struct md_fld) };
struct ptlrpc_request *req;
struct req_capsule pill;
struct md_fld *pmf;
__u32 *op;
+ int rc;
ENTRY;
LASSERT(exp != NULL);
req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_MDS_VERSION, FLD_QUERY,
- 2, size, NULL);
+ LUSTRE_MDS_VERSION,
+ FLD_QUERY, 3, size,
+ NULL);
if (req == NULL)
RETURN(-ENOMEM);
req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-
req_capsule_set(&pill, &RQF_FLD_QUERY);
op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
*pmf = *mf;
- req->rq_replen = lustre_msg_size(1, &mf_size);
+ size[1] = sizeof(struct md_fld);
+ ptlrpc_req_set_repsize(req, 2, size);
req->rq_request_portal = FLD_REQUEST_PORTAL;
rc = ptlrpc_queue_wait(req);
return rc;
}
-static int __fld_client_create(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t mds,
- struct md_fld *md_fld)
+int fld_client_create(struct lu_client_fld *fld,
+ seqno_t seq, mdsno_t mds,
+ const struct lu_env *env)
{
- struct fld_target *target;
- __u32 rc;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
+ struct lu_fld_target *target;
+ int rc;
ENTRY;
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
+ LASSERT(target != NULL);
- rc = fld_client_rpc(target->fldt_exp, md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_create(target->ft_srv,
+ env, seq, mds);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+ }
+#endif
- if (rc == 0) {
- /* do not return result of calling fld_cache_insert()
+ if (rc == 0) {
+ /*
+ * Do not return result of calling fld_cache_insert()
* here. First of all because it may return -EEXISTS. Another
* reason is that, we do not want to stop proceeding because of
- * cache errors. --umka */
- fld_cache_insert(fld->fld_cache, seq, mds);
+ * cache errors. --umka
+ */
+ fld_cache_insert(fld->lcf_cache, seq, mds);
+ } else {
+ CERROR("%s: Can't create FLD entry, rc %d\n",
+ fld->lcf_name, rc);
}
-
- RETURN(rc);
-}
-
-int fld_client_create(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t mds)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
- __u32 rc;
- ENTRY;
-
- rc = __fld_client_create(fld, seq, mds, &md_fld);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_create);
-static int __fld_client_delete(struct lu_client_fld *fld,
- seqno_t seq, struct md_fld *md_fld)
+int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
+ const struct lu_env *env)
{
- struct fld_target *target;
- __u32 rc;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_fld_target *target;
+ int rc;
+ ENTRY;
- fld_cache_delete(fld->fld_cache, seq);
+ fld_cache_delete(fld->lcf_cache, seq);
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
-
- rc = fld_client_rpc(target->fldt_exp,
- md_fld, FLD_DELETE);
- RETURN(rc);
-}
+ LASSERT(target != NULL);
-int fld_client_delete(struct lu_client_fld *fld,
- seqno_t seq)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- __u32 rc;
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_delete(target->ft_srv,
+ env, seq);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+ }
+#endif
- rc = __fld_client_delete(fld, seq, &md_fld);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_delete);
-static int __fld_client_lookup(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t *mds,
- struct md_fld *md_fld)
+int fld_client_lookup(struct lu_client_fld *fld,
+ seqno_t seq, mdsno_t *mds,
+ const struct lu_env *env)
{
- struct fld_target *target;
+ struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+ struct lu_fld_target *target;
int rc;
ENTRY;
/* lookup it in the cache */
- rc = fld_cache_lookup(fld->fld_cache, seq, mds);
+ rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
if (rc == 0)
RETURN(0);
/* can not find it in the cache */
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
-
- rc = fld_client_rpc(target->fldt_exp,
- md_fld, FLD_LOOKUP);
- if (rc == 0)
- *mds = md_fld->mf_mds;
-
- /* do not return error here as well. See previous comment in same
- * situation in function fld_client_create(). --umka */
- fld_cache_insert(fld->fld_cache, seq, *mds);
-
- RETURN(rc);
-}
+ LASSERT(target != NULL);
-int fld_client_lookup(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t *mds)
-{
- struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
- int rc;
- ENTRY;
-
- rc = __fld_client_lookup(fld, seq, mds, &md_fld);
+#ifdef __KERNEL__
+ if (target->ft_srv != NULL) {
+ LASSERT(env != NULL);
+ rc = fld_server_lookup(target->ft_srv,
+ env, seq, &md_fld.mf_mds);
+ } else {
+#endif
+ rc = fld_client_rpc(target->ft_exp,
+ &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+ }
+#endif
+ if (rc == 0) {
+ *mds = md_fld.mf_mds;
+
+ /*
+ * Do not return error here as well. See previous comment in
+ * same situation in function fld_client_create(). --umka
+ */
+ fld_cache_insert(fld->lcf_cache, seq, *mds);
+ }
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_lookup);