- remove needless locks in fld server and client.
extern struct lu_context_key seq_thread_key;
+/* Functions used internally in module. */
+int seq_client_alloc_super(struct lu_client_seq *seq,
+ const struct lu_env *env);
+
+int seq_client_replay_super(struct lu_client_seq *seq,
+ struct lu_range *range,
+ const struct lu_env *env);
+
+/* Store API functions. */
int seq_store_init(struct lu_server_seq *seq,
const struct lu_env *env,
struct dt_device *dt);
#include <lustre_mdc.h>
#include "fid_internal.h"
-static int seq_client_rpc(struct lu_client_seq *seq,
- struct lu_range *input,
- struct lu_range *output,
- __u32 opc, const char *opcname)
+static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
+ struct lu_range *output, __u32 opc,
+ const char *opcname)
{
int rc, size[3] = { sizeof(struct ptlrpc_body),
sizeof(__u32),
__u32 *op;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_MDS_VERSION,
- SEQ_QUERY, 3, size,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ SEQ_QUERY, 3, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
DRANGE"]\n", seq->lcs_name, PRANGE(output));
GOTO(out_req, rc = -EINVAL);
}
-
- /*
- * Save server response to request for recovery case, it will be sent to
- * server later if needed.
- */
*in = *out;
CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
}
/* Request sequence-controller node to allocate new super-sequence. */
-static int __seq_client_alloc_super(struct lu_client_seq *seq,
- const struct lu_env *env)
-{
- int rc;
-
-#ifdef __KERNEL__
- if (seq->lcs_srv) {
- LASSERT(env != NULL);
- rc = seq_server_alloc_super(seq->lcs_srv, NULL,
- &seq->lcs_space, env);
- } else {
-#endif
- rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
- SEQ_ALLOC_SUPER, "super");
-#ifdef __KERNEL__
- }
-#endif
- return rc;
-}
-
-int seq_client_alloc_super(struct lu_client_seq *seq,
- const struct lu_env *env)
+int seq_client_replay_super(struct lu_client_seq *seq,
+ struct lu_range *range,
+ const struct lu_env *env)
{
int rc;
ENTRY;
down(&seq->lcs_sem);
- rc = __seq_client_alloc_super(seq, env);
- up(&seq->lcs_sem);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_super);
-
-/* Request sequence-controller node to allocate new super-sequence. */
-static int __seq_client_replay_super(struct lu_client_seq *seq,
- struct lu_range *range,
- const struct lu_env *env)
-{
- int rc = 0;
-
+
#ifdef __KERNEL__
if (seq->lcs_srv) {
LASSERT(env != NULL);
&seq->lcs_space, env);
} else {
#endif
-#if 0
- /*
- * XXX: Seems we do not need to replay in case of remote
- * controller. Lustre anyway supports onlu signle failure
- * recovery.
- */
rc = seq_client_rpc(seq, range, &seq->lcs_space,
SEQ_ALLOC_SUPER, "super");
-#endif
#ifdef __KERNEL__
}
#endif
- return rc;
+ up(&seq->lcs_sem);
+ RETURN(rc);
}
-int seq_client_replay_super(struct lu_client_seq *seq,
- struct lu_range *range,
- const struct lu_env *env)
+/* Request sequence-controller node to allocate new super-sequence. */
+int seq_client_alloc_super(struct lu_client_seq *seq,
+ const struct lu_env *env)
{
- int rc;
ENTRY;
-
- down(&seq->lcs_sem);
- rc = __seq_client_replay_super(seq, range, env);
- up(&seq->lcs_sem);
-
- RETURN(rc);
+ RETURN(seq_client_replay_super(seq, NULL, env));
}
/* Request sequence-controller node to allocate new meta-sequence. */
-static int __seq_client_alloc_meta(struct lu_client_seq *seq,
- const struct lu_env *env)
+static int seq_client_alloc_meta(struct lu_client_seq *seq,
+ const struct lu_env *env)
{
int rc;
+ ENTRY;
#ifdef __KERNEL__
if (seq->lcs_srv) {
LASSERT(env != NULL);
rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
- &seq->lcs_space,
- env);
+ &seq->lcs_space, env);
} else {
#endif
rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
#ifdef __KERNEL__
}
#endif
- return rc;
-}
-
-int seq_client_alloc_meta(struct lu_client_seq *seq,
- const struct lu_env *env)
-{
- int rc;
- ENTRY;
-
- down(&seq->lcs_sem);
- rc = __seq_client_alloc_meta(seq, env);
- up(&seq->lcs_sem);
-
RETURN(rc);
}
-EXPORT_SYMBOL(seq_client_alloc_meta);
-/* allocate new sequence for client (llite or MDC are expected to use this) */
-static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+/* Allocate new sequence for client. */
+static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
{
- int rc = 0;
+ int rc;
ENTRY;
LASSERT(range_is_sane(&seq->lcs_space));
- /*
- * If we still have free sequences in meta-sequence we allocate new seq
- * from given range, if not - allocate new meta-sequence.
- */
- if (range_space(&seq->lcs_space) == 0) {
- rc = __seq_client_alloc_meta(seq, NULL);
+ if (range_is_exhausted(&seq->lcs_space)) {
+ rc = seq_client_alloc_meta(seq, NULL);
if (rc) {
CERROR("%s: Can't allocate new meta-sequence, "
"rc %d\n", seq->lcs_name, rc);
CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
seq->lcs_name, PRANGE(&seq->lcs_space));
}
+ } else {
+ rc = 0;
}
- LASSERT(range_space(&seq->lcs_space) > 0);
+ LASSERT(!range_is_exhausted(&seq->lcs_space));
*seqnr = seq->lcs_space.lr_start;
- seq->lcs_space.lr_start++;
-
- CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n",
- seq->lcs_name, *seqnr);
- RETURN(rc);
-}
-
-int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
-{
- int rc = 0;
- ENTRY;
-
- down(&seq->lcs_sem);
- rc = __seq_client_alloc_seq(seq, seqnr);
- up(&seq->lcs_sem);
+ seq->lcs_space.lr_start += 1;
+ CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
+ *seqnr);
+
RETURN(rc);
}
-EXPORT_SYMBOL(seq_client_alloc_seq);
+/* Allocate new fid on passed client @seq and save it to @fid. */
int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
{
int rc;
ENTRY;
+ LASSERT(seq != NULL);
LASSERT(fid != NULL);
down(&seq->lcs_sem);
- if (!fid_is_sane(&seq->lcs_fid) ||
+ if (fid_is_zero(&seq->lcs_fid) ||
fid_oid(&seq->lcs_fid) >= seq->lcs_width)
{
seqno_t seqnr;
- /*
- * Allocate new sequence for case client has no sequence at all
- * or sequence is exhausted and should be switched.
- */
- rc = __seq_client_alloc_seq(seq, &seqnr);
+ rc = seq_client_alloc_seq(seq, &seqnr);
if (rc) {
CERROR("%s: Can't allocate new sequence, "
"rc %d\n", seq->lcs_name, rc);
- GOTO(out, rc);
+ up(&seq->lcs_sem);
+ RETURN(rc);
}
+ CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
+ "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
+
seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
seq->lcs_fid.f_seq = seqnr;
seq->lcs_fid.f_ver = 0;
* to setup FLD for it.
*/
rc = 1;
-
- CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
- "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
} else {
- seq->lcs_fid.f_oid++;
+ /* Just bump last allocated fid and return to caller. */
+ seq->lcs_fid.f_oid += 1;
rc = 0;
}
-
+
*fid = seq->lcs_fid;
- LASSERT(fid_is_sane(fid));
-
- CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
- PFID(fid));
-
- EXIT;
-out:
up(&seq->lcs_sem);
- return rc;
+
+ CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
+ RETURN(rc);
}
EXPORT_SYMBOL(seq_client_alloc_fid);
for (i = 0; i < hash_size; i++)
INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
-
+ memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
+
CDEBUG(D_INFO|D_WARNING, "%s: FLD cache - Size: %d, Threshold: %d\n",
cache->fci_name, cache_size, cache_threshold);
void fld_cache_fini(struct fld_cache *cache)
{
+ __u64 pct;
ENTRY;
LASSERT(cache != NULL);
fld_cache_flush(cache);
+ if (cache->fci_stat.fst_count > 0) {
+ pct = cache->fci_stat.fst_cache * 100;
+ do_div(pct, cache->fci_stat.fst_count);
+ } else {
+ pct = 0;
+ }
+
+ printk("FLD cache statistics (%s):\n", cache->fci_name);
+ printk(" Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
+ printk(" Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
+ printk(" Cache hits: "LPU64"%%\n", pct);
+
OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
sizeof(*cache->fci_hash_table));
OBD_FREE_PTR(cache);
bucket = fld_cache_bucket(cache, seq);
spin_lock(&cache->fci_lock);
+ cache->fci_stat.fst_count++;
hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
if (flde->fce_seq == seq) {
*mds = flde->fce_mds;
list_del(&flde->fce_lru);
list_add(&flde->fce_lru, &cache->fci_lru);
+ cache->fci_stat.fst_cache++;
spin_unlock(&cache->fci_lock);
RETURN(0);
}
int rc;
ENTRY;
- fld->lsf_stat.fst_count++;
-
/* Lookup it in the cache. */
rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
- if (rc == 0) {
- fld->lsf_stat.fst_cache++;
+ if (rc == 0)
RETURN(0);
- }
rc = fld_index_lookup(fld, env, seq, mds);
if (rc == 0) {
int rc;
ENTRY;
- down(&fld->lsf_sem);
-
switch (opc) {
case FLD_CREATE:
rc = fld_server_create(fld, env,
break;
}
- up(&fld->lsf_sem);
-
CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
mf->mf_seq, mf->mf_mds);
int rc;
ENTRY;
- memset(&fld->lsf_stat, 0, sizeof(fld->lsf_stat));
snprintf(fld->lsf_name, sizeof(fld->lsf_name),
"srv-%s", prefix);
- sema_init(&fld->lsf_sem, 1);
-
cache_size = FLD_SERVER_CACHE_SIZE /
sizeof(struct fld_cache_entry);
void fld_server_fini(struct lu_server_fld *fld,
const struct lu_env *env)
{
- __u64 pct;
ENTRY;
- if (fld->lsf_stat.fst_count > 0) {
- pct = fld->lsf_stat.fst_cache * 100;
- do_div(pct, fld->lsf_stat.fst_count);
- } else {
- pct = 0;
- }
-
- printk("FLD cache statistics (%s):\n", fld->lsf_name);
- printk(" Total reqs: "LPU64"\n", fld->lsf_stat.fst_count);
- printk(" Cache reqs: "LPU64"\n", fld->lsf_stat.fst_cache);
- printk(" Cache hits: "LPU64"%%\n", pct);
-
fld_server_proc_fini(fld);
fld_index_fini(fld, env);
LASSERT(fld != NULL);
- memset(&fld->lcf_stat, 0, sizeof(fld->lcf_stat));
snprintf(fld->lcf_name, sizeof(fld->lcf_name),
"cli-%s", prefix);
fld->lcf_count = 0;
spin_lock_init(&fld->lcf_lock);
- sema_init(&fld->lcf_sem, 1);
fld->lcf_hash = &fld_hash[hash];
INIT_LIST_HEAD(&fld->lcf_targets);
void fld_client_fini(struct lu_client_fld *fld)
{
struct lu_fld_target *target, *tmp;
- __u64 pct;
ENTRY;
- if (fld->lcf_stat.fst_count > 0) {
- pct = fld->lcf_stat.fst_cache * 100;
- do_div(pct, fld->lcf_stat.fst_count);
- } else {
- pct = 0;
- }
-
- printk("FLD cache statistics (%s):\n", fld->lcf_name);
- printk(" Total reqs: "LPU64"\n", fld->lcf_stat.fst_count);
- printk(" Cache reqs: "LPU64"\n", fld->lcf_stat.fst_cache);
- printk(" Cache hits: "LPU64"%%\n", pct);
-
fld_client_proc_fini(fld);
spin_lock(&fld->lcf_lock);
int rc;
ENTRY;
- down(&fld->lcf_sem);
-
target = fld_client_get_target(fld, seq);
LASSERT(target != NULL);
CERROR("%s: Can't create FLD entry, rc %d\n",
fld->lcf_name, rc);
}
- up(&fld->lcf_sem);
RETURN(rc);
}
int rc;
ENTRY;
- down(&fld->lcf_sem);
-
fld_cache_delete(fld->lcf_cache, seq);
target = fld_client_get_target(fld, seq);
}
#endif
- up(&fld->lcf_sem);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_delete);
int rc;
ENTRY;
- down(&fld->lcf_sem);
-
- fld->lcf_stat.fst_count++;
-
/* Lookup it in the cache */
rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
- if (rc == 0) {
- fld->lcf_stat.fst_cache++;
- up(&fld->lcf_sem);
+ if (rc == 0)
RETURN(0);
- }
/* Can not find it in the cache */
target = fld_client_get_target(fld, seq);
*/
fld_cache_insert(fld->lcf_cache, seq, *mds);
}
- up(&fld->lcf_sem);
RETURN(rc);
}
EXPORT_SYMBOL(fld_client_lookup);
fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0;
}
+static inline int fid_is_zero(const struct lu_fid *fid)
+{
+ return fid_seq(fid) == 0 && fid_oid(fid) == 0;
+}
+
#define DFID "[0x%16.16"LPF64"x/0x%8.8x:0x%8.8x]"
#define PFID(fid) \
/* Client sequence manager interface. */
struct lu_client_seq {
- /* sequence-controller export. */
+ /* Sequence-controller export. */
struct obd_export *lcs_exp;
struct semaphore lcs_sem;
*/
__u64 lcs_width;
- /* seq-server for direct talking */
+ /* Seq-server for direct talking */
struct lu_server_seq *lcs_srv;
};
void seq_client_fini(struct lu_client_seq *seq);
-int seq_client_alloc_super(struct lu_client_seq *seq,
- const struct lu_env *env);
-
-int seq_client_replay_super(struct lu_client_seq *seq,
- struct lu_range *range,
- const struct lu_env *env);
-
-int seq_client_alloc_meta(struct lu_client_seq *seq,
- const struct lu_env *env);
-
-int seq_client_alloc_seq(struct lu_client_seq *seq,
- seqno_t *seqnr);
-
int seq_client_alloc_fid(struct lu_client_seq *seq,
struct lu_fid *fid);
/* Fids common stuff */
int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
-
void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src);
void fid_cpu_to_be(struct lu_fid *dst, const struct lu_fid *src);
void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src);
/* Lru list */
struct list_head fci_lru;
+ /* Cache statistics. */
+ struct fld_stats fci_stat;
+
/* Cache name used for debug and messages. */
char fci_name[80];
};
/* Protect index modifications */
struct semaphore lsf_sem;
- /* Server cache statistics. */
- struct fld_stats lsf_stat;
-
/* Fld service name in form "fld-srv-lustre-MDTXXX" */
char lsf_name[80];
};
/* Lock protecting exports list and fld_hash. */
spinlock_t lcf_lock;
- /* Client cache statistics. */
- struct fld_stats lcf_stat;
-
/* Protect fld req + cache modification. */
struct semaphore lcf_sem;
ino_t ino;
ENTRY;
- /* very stupid and having many downsides inode allocation algorithm
- * based on fid. */
+ /*
+ * Very stupid and having many downsides inode allocation algorithm
+ * based on fid.
+ */
ino = (fid_seq(fid) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(fid);
RETURN(ino & 0x7fffffff);
}
struct lu_fid *fid = &info->mti_fid;
struct mdd_object *obj = md2mdd_obj(pobj);
int rc;
-
ENTRY;
+
/* EEXIST check */
if (mdd_is_dead_obj(obj))
RETURN(-ENOENT);