site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
LASSERT(site != NULL);
- req_capsule_pack(&info->sti_pill);
+ rc = req_capsule_pack(&info->sti_pill);
+ if (rc)
+ RETURN(rc);
opc = req_capsule_client_get(&info->sti_pill,
&RMF_SEQ_OPC);
proc_lustre_root,
NULL, NULL);
if (IS_ERR(seq->lss_proc_dir)) {
- CERROR("LProcFS failed in seq-init\n");
rc = PTR_ERR(seq->lss_proc_dir);
RETURN(rc);
}
seq->lss_proc_dir,
NULL, NULL);
if (IS_ERR(seq->lss_proc_entry)) {
- CERROR("LProcFS failed in seq-init\n");
rc = PTR_ERR(seq->lss_proc_entry);
GOTO(out_cleanup, rc);
}
#ifdef LPROCFS
/*
- * Server side procfs stuff.
- *
* Note: this function is only used for testing, it is no safe for production
* use.
*/
RETURN(0);
}
+/*
+ * Server side procfs stuff.
+ */
static int
seq_proc_read_common(char *page, char **start, off_t off,
int count, int *eof, void *data,
#include "fld_internal.h"
#ifdef __KERNEL__
+static inline __u32 fld_cache_hash(seqno_t seq)
+{
+ return (__u32)seq;
+}
+
struct fld_cache_info *fld_cache_init(int size)
{
struct fld_cache_info *cache;
ENTRY;
/* check if size is power of two */
- LASSERT((size & -size) == size);
+ LASSERT(IS_PO2(size));
OBD_ALLOC_PTR(cache);
if (cache == NULL)
/* init fld cache info */
cache->fci_hash_mask = size - 1;
- OBD_ALLOC(cache->fci_hash, size *
- sizeof(*cache->fci_hash));
+ OBD_ALLOC(cache->fci_hash, size * sizeof(*cache->fci_hash));
if (cache->fci_hash == NULL) {
OBD_FREE_PTR(cache);
RETURN(ERR_PTR(-ENOMEM));
}
EXPORT_SYMBOL(fld_cache_fini);
+static inline struct hlist_head *
+fld_cache_bucket(struct fld_cache_info *cache, seqno_t seq)
+{
+ return cache->fci_hash + (fld_cache_hash(seq) &
+ cache->fci_hash_mask);
+}
+
int fld_cache_insert(struct fld_cache_info *cache,
seqno_t seq, mdsno_t mds)
{
int rc = 0;
ENTRY;
+ bucket = fld_cache_bucket(cache, seq);
+
+ spin_lock(&cache->fci_lock);
+ hlist_for_each_entry(fldt, scan, bucket, fce_list) {
+ if (fldt->fce_seq == seq)
+ spin_unlock(&cache->fci_lock);
+ RETURN(rc = -EEXIST);
+ }
+ spin_unlock(&cache->fci_lock);
+
OBD_ALLOC_PTR(flde);
if (!flde)
RETURN(-ENOMEM);
- bucket = cache->fci_hash + (fld_cache_hash(seq) &
- cache->fci_hash_mask);
-
spin_lock(&cache->fci_lock);
hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq)
- GOTO(exit_unlock, rc = -EEXIST);
+ if (fldt->fce_seq == seq) {
+ spin_unlock(&cache->fci_lock);
+ OBD_FREE_PTR(flde);
+ RETURN(0);
+ }
}
-
INIT_HLIST_NODE(&flde->fce_list);
flde->fce_mds = mds;
flde->fce_seq = seq;
hlist_add_head(&flde->fce_list, bucket);
-
- EXIT;
-exit_unlock:
spin_unlock(&cache->fci_lock);
- if (rc != 0)
- OBD_FREE_PTR(flde);
- return rc;
+
+ RETURN(0);
}
EXPORT_SYMBOL(fld_cache_insert);
struct hlist_node *scan;
ENTRY;
- bucket = cache->fci_hash + (fld_cache_hash(seq) &
- cache->fci_hash_mask);
+ bucket = fld_cache_bucket(cache, seq);
spin_lock(&cache->fci_lock);
hlist_for_each_entry(flde, scan, bucket, fce_list) {
struct hlist_node *scan;
ENTRY;
- bucket = cache->fci_hash + (fld_cache_hash(seq) &
- cache->fci_hash_mask);
+ bucket = fld_cache_bucket(cache, seq);
spin_lock(&cache->fci_lock);
hlist_for_each_entry(flde, scan, bucket, fce_list) {
const struct lu_context *ctx,
seqno_t seq, mdsno_t mds)
{
- ENTRY;
- RETURN(fld_index_create(fld, ctx, seq, mds));
+ return fld_index_create(fld, ctx, seq, mds);
}
EXPORT_SYMBOL(fld_server_create);
const struct lu_context *ctx,
seqno_t seq)
{
- ENTRY;
- RETURN(fld_index_delete(fld, ctx, seq));
+ return fld_index_delete(fld, ctx, seq);
}
EXPORT_SYMBOL(fld_server_delete);
const struct lu_context *ctx,
seqno_t seq, mdsno_t *mds)
{
- ENTRY;
- RETURN(fld_index_lookup(fld, ctx, seq, mds));
+ return fld_index_lookup(fld, ctx, seq, mds);
}
EXPORT_SYMBOL(fld_server_lookup);
__u32 *opc;
ENTRY;
- req_capsule_pack(&info->fti_pill);
+ rc = req_capsule_pack(&info->fti_pill);
+ if (rc)
+ RETURN(rc);
opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
if (opc != NULL) {
in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
- if (in == NULL) {
- CERROR("cannot unpack fld request\n");
+ if (in == NULL)
RETURN(-EPROTO);
- }
out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
- if (out == NULL) {
- CERROR("cannot allocate fld response\n");
+ if (out == NULL)
RETURN(-EPROTO);
- }
*out = *in;
rc = fld_server_handle(fld, ctx, *opc, out);
- } else {
- CERROR("cannot unpack FLD operation\n");
}
RETURN(rc);
}
struct lu_context_key fld_thread_key = {
- .lct_tags = LCT_MD_THREAD,
+ .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
.lct_init = fld_thread_init,
.lct_fini = fld_thread_fini
};
static int fld_req_handle(struct ptlrpc_request *req)
{
- int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
const struct lu_context *ctx;
struct fld_thread_info *info;
struct lu_site *site;
GOTO(out_info, rc);
}
- target_send_reply(req, rc, fail);
+ target_send_reply(req, rc, OBD_FAIL_FLD_ALL_REPLY_NET);
EXIT;
out_info:
fld_thread_info_fini(info);
proc_lustre_root,
NULL, NULL);
if (IS_ERR(fld->fld_proc_dir)) {
- CERROR("LProcFS failed in fld-init\n");
rc = PTR_ERR(fld->fld_proc_dir);
RETURN(rc);
}
fld->fld_proc_dir,
NULL, NULL);
if (IS_ERR(fld->fld_proc_entry)) {
- CERROR("LProcFS failed in fld-init\n");
rc = PTR_ERR(fld->fld_proc_entry);
GOTO(out_cleanup, rc);
}
#include <linux/types.h>
-struct fld_target {
- struct list_head fldt_chain;
- struct obd_export *fldt_exp;
- __u64 fldt_idx;
-};
-
enum fld_op {
FLD_CREATE = 0,
FLD_DELETE = 1,
FLD_LOOKUP = 2
};
-#define FLD_HTABLE_SIZE 256
+enum {
+ FLD_HTABLE_SIZE = 256
+};
-extern struct lu_fld_hash fld_hash[3];
+extern struct lu_fld_hash fld_hash[2];
#ifdef __KERNEL__
#define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
int fld_index_lookup(struct lu_server_fld *fld,
const struct lu_context *ctx,
seqno_t seq, mdsno_t *mds);
-
-static inline __u32 fld_cache_hash(seqno_t seq)
-{
- return (__u32)seq;
-}
#endif
#ifdef LPROCFS
static int fld_rrb_hash(struct lu_client_fld *fld,
seqno_t seq)
{
- if (fld->fld_count == 0)
- return 0;
-
+ LASSERT(fld->fld_count > 0);
return do_div(seq, fld->fld_count);
}
+static struct fld_target *
+fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
+{
+ struct fld_target *target;
+ int hash;
+ ENTRY;
+
+ hash = fld_rrb_hash(fld, seq);
+
+ list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
+ if (target->fldt_idx == hash)
+ RETURN(target);
+ }
+
+ /* if target is not found, there is logical error anyway, so here is
+ * LBUG() to catch this situation. */
+ LBUG();
+ RETURN(NULL);
+}
+
static int fld_dht_hash(struct lu_client_fld *fld,
seqno_t seq)
{
return fld_rrb_hash(fld, seq);
}
-struct lu_fld_hash fld_hash[3] = {
+static struct fld_target *
+fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
+{
+ /* XXX: here should be DHT scan code */
+ return fld_dht_scan(fld, seq);
+}
+
+struct lu_fld_hash fld_hash[2] = {
{
.fh_name = "DHT",
- .fh_func = fld_dht_hash
+ .fh_hash_func = fld_dht_hash,
+ .fh_scan_func = fld_dht_scan
},
{
- .fh_name = "Round Robin",
- .fh_func = fld_rrb_hash
- },
- {
- 0,
+ .fh_name = "RRB",
+ .fh_hash_func = fld_rrb_hash,
+ .fh_scan_func = fld_rrb_scan
}
};
-/* this function makes decision if passed @target appropriate acoordingly to
- * passed @hash. In case of usual round-robin hash, this is decided by comparing
- * hash and target's index. In the case of DHT, algorithm is a bit more
- * complicated. */
-static int fld_client_apt_target(struct fld_target *target,
- int hash)
-{
- /* XXX: DHT case should be worked out. */
- return (target->fldt_idx == hash);
-}
-
static struct fld_target *
fld_client_get_target(struct lu_client_fld *fld,
seqno_t seq)
{
struct fld_target *target;
- int hash;
ENTRY;
LASSERT(fld->fld_hash != NULL);
spin_lock(&fld->fld_lock);
- hash = fld->fld_hash->fh_func(fld, seq);
-
- list_for_each_entry(target,
- &fld->fld_targets, fldt_chain) {
- if (fld_client_apt_target(target, hash)) {
- spin_unlock(&fld->fld_lock);
- RETURN(target);
- }
- }
+ target = fld->fld_hash->fh_scan_func(fld, seq);
spin_unlock(&fld->fld_lock);
-
- /* if target is not found, there is logical error anyway, so here is
- * LBUG() to catch that situation. */
- LBUG();
- RETURN(NULL);
+
+ RETURN(target);
}
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
+/*
+ * Add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module.
+ */
int fld_client_add_target(struct lu_client_fld *fld,
struct obd_export *exp)
{
LASSERT(fld != NULL);
if (!hash_is_sane(hash)) {
- CERROR("wrong hash function 0x%x\n", hash);
+ CERROR("wrong hash function %#x\n", hash);
RETURN(-EINVAL);
}
ENTRY;
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
+ LASSERT(target != NULL);
rc = fld_client_rpc(target->fldt_exp, md_fld, FLD_CREATE);
fld_cache_delete(fld->fld_cache, seq);
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
+ LASSERT(target != NULL);
rc = fld_client_rpc(target->fldt_exp,
md_fld, FLD_DELETE);
/* can not find it in the cache */
target = fld_client_get_target(fld, seq);
- if (!target)
- RETURN(-EINVAL);
+ LASSERT(target != NULL);
rc = fld_client_rpc(target->fldt_exp,
md_fld, FLD_LOOKUP);
LASSERT(fld != NULL);
- for (i = 0; i < sizeof(fld_hash) / sizeof(*hash); i++) {
+ for (i = 0; i < ARRAY_SIZE(fld_hash); i++) {
if (fld_hash[i].fh_name == NULL ||
count != strlen(fld_hash[i].fh_name))
continue;
LUSTRE_CLI_FLD_HASH_RRB
};
+struct fld_target {
+ struct list_head fldt_chain;
+ struct obd_export *fldt_exp;
+ __u64 fldt_idx;
+};
+
typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
+typedef struct fld_target * (*fld_scan_func_t) (struct lu_client_fld *, __u64);
struct lu_fld_hash {
const char *fh_name;
- fld_hash_func_t fh_func;
+ fld_hash_func_t fh_hash_func;
+ fld_scan_func_t fh_scan_func;
};
struct lu_server_fld {