RETURN(result);
}
-int fld_server_init(struct fld *fld, struct dt_device *dt)
+int fld_server_init(struct lu_context *ctx, struct fld *fld,
+ struct dt_device *dt)
{
int result;
struct ptlrpc_service_conf fld_conf = {
.psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
.psc_num_threads = FLD_NUM_THREADS
};
- struct fld_info *fld_info;
fld->fld_dt = dt;
lu_device_get(&dt->dd_lu_dev);
INIT_LIST_HEAD(&fld_list_head.fld_list);
spin_lock_init(&fld_list_head.fld_lock);
- OBD_ALLOC_PTR(fld_info);
- if(!fld_info)
- return -ENOMEM;
- fld_info_init(fld_info);
- fld->fld_info = fld_info;
+ fld_iam_init(ctx, fld);
fld->fld_service =
ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
else
result = -ENOMEM;
if (result != 0) {
+ fld_iam_fini(fld);
fld_server_fini(fld);
- fld_info_fini(fld_info);
}
return result;
}
}
spin_unlock(&fld_list_head.fld_lock);
lu_device_put(&fld->fld_dt->dd_lu_dev);
- fld_info_fini(fld->fld_info);
+ fld_iam_fini(fld);
fld->fld_dt = NULL;
}
EXPORT_SYMBOL(fld_server_fini);
#include <linux/lustre_iam.h>
#include "fld_internal.h"
+
struct iam_descr fld_param = {
.id_key_size = sizeof ((struct lu_fid *)0)->f_seq,
.id_ptr_size = 4, /* 32 bit block numbers for now */
.id_rec_size = sizeof(mdsno_t),
.id_node_gap = 0, /* no gaps in index nodes */
- .id_root_gap = 0,
-
-#if 0
- .id_root_ptr = iam_root_ptr, /* returns 0: root is always at the
- * beginning of the file (as it
- * htree) */
- .id_node_read = iam_node_read,
- .id_node_check = iam_node_check,
- .id_node_init = iam_node_init,
- .id_keycmp = iam_keycmp,
-#endif
+ .id_root_gap = sizeof(struct iam_root),
+ .id_ops = &generic_iam_ops,
+ .id_leaf_ops = &lfix_leaf_ops
+};
+/*
+ * number of blocks to reserve for particular operations. Should be function
+ * of ... something. Stub for now.
+ */
+
+enum {
+ FLD_TXN_INDEX_INSERT_CREDITS = 10,
+ FLD_TXN_INDEX_DELETE_CREDITS = 10
};
+static int fld_keycmp(struct iam_container *c, struct iam_key *k1,
+ struct iam_key *k2)
+{
+ __u64 p1 = le64_to_cpu(*(__u32 *)k1);
+ __u64 p2 = le64_to_cpu(*(__u32 *)k2);
+
+ return p1 > p2 ? +1 : (p1 < p2 ? -1 : 0);
+
+}
+
int fld_handle_insert(const struct lu_context *ctx, struct fld *fld,
- fidseq_t seq_num, mdsno_t mdsno)
+ fidseq_t seq_num, mdsno_t mds_num)
{
- /*
- * XXX Use ->dio_index_insert() from struct dt_index_operations. The
- * same below.
- */
-#if 0
- return fld->fld_dt->dd_ops->dt_iam_insert(&lctx, fld->fld_dt,
- fld->fld_info->fi_container,
- &seq_num, fld_param.id_key_size,
- &mdsno, fld_param.id_rec_size);
-#else
- return 0;
-#endif
+ struct dt_device *dt = fld->fld_dt;
+ struct dt_object *dt_obj = fld->fld_obj;
+ struct txn_param txn;
+ struct thandle *th;
+ int rc;
+
+
+ /*stub here, will fix it later*/
+ txn.tp_credits = FLD_TXN_INDEX_INSERT_CREDITS;
+
+ th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
+
+ rc = dt_obj->do_index_ops->dio_insert(ctx, dt_obj,
+ (struct dt_rec*)(&mds_num),
+ (struct dt_key*)(&seq_num), th);
+ dt->dd_ops->dt_trans_stop(ctx, th);
+
+ RETURN(rc);
}
int fld_handle_delete(const struct lu_context *ctx, struct fld *fld,
fidseq_t seq_num, mdsno_t mds_num)
{
-#if 0
- return fld->fld_dt->dd_ops->dt_iam_delete(&lctx, fld->fld_dt,
- fld->fld_info->fi_container,
- &seq_num, fld_param.id_key_size,
- &mds_num, fld_param.id_rec_size);
-#else
- return 0;
-#endif
+ struct dt_device *dt = fld->fld_dt;
+ struct dt_object *dt_obj = fld->fld_obj;
+ struct txn_param txn;
+ struct thandle *th;
+ int rc;
+
+
+ txn.tp_credits = FLD_TXN_INDEX_DELETE_CREDITS;
+ th = dt->dd_ops->dt_trans_start(ctx, dt, &txn);
+ rc = dt_obj->do_index_ops->dio_delete(ctx, dt_obj,
+ (struct dt_rec*)(&mds_num),
+ (struct dt_key*)(&seq_num), th);
+ dt->dd_ops->dt_trans_stop(ctx, th);
+
+ RETURN(rc);
}
int fld_handle_lookup(const struct lu_context *ctx,
struct fld *fld, fidseq_t seq_num, mdsno_t *mds_num)
{
-#if 0
- int size;
-
- size = fld_param.id_rec_size;
- return fld->fld_dt->dd_ops->dt_iam_lookup(&lctx, fld->fld_dt,
- fld->fld_info->fi_container,
- &seq_num, fld_param.id_key_size,
- mds_num, &size);
-#else
- return 0;
-#endif
+
+ struct dt_device *dt = fld->fld_dt;
+ struct dt_object *dt_obj = fld->fld_obj;
+
+ return dt_obj->do_index_ops->dio_lookup(ctx, dt_obj,
+ (struct dt_rec*)(&mds_num),
+ (struct dt_key*)(&seq_num));
}
-int fld_info_init(struct fld_info *fld_info)
+#define FLD_OBJ_FID {1000, 1000, 1000}
+#define FLD_OBJ_MODE S_IFDIR
+int fld_iam_init(struct lu_context *ctx, struct fld *fld)
{
- struct file *fld_file;
- int rc;
+ struct lu_fid obj_fid = FLD_OBJ_FID; /* reserved fid */
+ struct dt_device *dt = fld->fld_dt;
+ struct dt_object *dt_obj;
+ struct iam_container *ic;
+ int rc = 0;
+
ENTRY;
- fld_file = filp_open("/dev/null", O_RDWR, S_IRWXU);
- /* sanity and security checks... */
- OBD_ALLOC(fld_info->fi_container, sizeof(struct iam_container));
- if (!fld_info->fi_container)
- RETURN(-ENOMEM);
+ dt_obj = dt_object_find(ctx, dt, &obj_fid);
+ if (IS_ERR(dt_obj)) {
+ CERROR("can not find fld obj %lu \n", PTR_ERR(dt_obj));
+ RETURN(PTR_ERR(dt_obj));
+ }
+
+ lu_object_get(&dt_obj->do_lu);
+ fld->fld_obj = dt_obj;
+ OBD_ALLOC_PTR(ic);
- rc = iam_container_init(fld_info->fi_container, &fld_param,
- fld_file->f_dentry->d_inode);
+ rc = dt_obj->do_index_ops->dio_init(ctx, dt, dt_obj, ic, &fld_param);
+ fld_param.id_ops->id_keycmp = fld_keycmp;
RETURN(rc);
}
-void fld_info_fini(struct fld_info *fld_info)
+void fld_iam_fini(struct fld *fld)
{
- iam_container_fini(fld_info->fi_container);
- OBD_FREE(fld_info->fi_container, sizeof(struct iam_container));
- OBD_FREE_PTR(fld_info);
+ struct dt_object *dt_obj = fld->fld_obj;
+
+ dt_obj->do_index_ops->dio_fini(dt_obj);
+ /*XXX Should put object here,
+ lu_object_put(fld->fld_obj->do_lu);
+ *but no ctxt in this func, FIX later*/
+ fld->fld_obj = NULL;
}
int fld_handle_lookup(const struct lu_context *ctx,
struct fld *fld, fidseq_t seq_num, mdsno_t *mds);
-int fld_info_init(struct fld_info *fld_info);
-void fld_info_fini(struct fld_info *fld_info);
+int fld_iam_init(struct lu_context *ctx, struct fld *fld);
+void fld_iam_fini(struct fld *fld);
#endif
*/
int (*dio_probe)(const struct lu_context *ctxt, struct dt_object *dt,
const struct dt_index_features *feat);
+
+ int (*dio_init)(struct lu_context *ctxt, struct dt_device *dt,
+ struct dt_object *dt_obj, void *container, void *param);
+
+ int (*dio_fini)(struct dt_object *dt_obj);
};
struct dt_device {
struct dt_index_operations *do_index_ops;
};
+struct dt_object *dt_object_find(struct lu_context *ctxt,
+ struct dt_device *d,
+ struct lu_fid *f);
int dt_object_init(struct dt_object *obj,
struct lu_object_header *h, struct lu_device *d);
/*
* fld (fid location database) interface.
*/
-struct fld_info {
- void *fi_container;
-};
struct fld {
struct proc_dir_entry *fld_proc_entry;
struct ptlrpc_service *fld_service;
struct dt_device *fld_dt;
- struct fld_info *fld_info;
+ struct dt_object *fld_obj;
+ void *fld_container;
};
-int fld_server_init(struct fld *fld, struct dt_device *dt);
+int fld_server_init(struct lu_context *ctx, struct fld *fld,
+ struct dt_device *dt);
void fld_server_fini(struct fld *fld);
-
#endif /* __LINUX_OBD_CLASS_H */
* FLD wrappers
*/
-static int mdt_fld_init(struct mdt_device *m)
+static int mdt_fld_init(struct lu_context *ctx, struct mdt_device *m)
{
struct lu_site *ls;
int rc;
OBD_ALLOC_PTR(ls->ls_fld);
if (ls->ls_fld != NULL)
- rc = fld_server_init(ls->ls_fld, m->mdt_bottom);
+ rc = fld_server_init(ctx, ls->ls_fld, m->mdt_bottom);
else
rc = -ENOMEM;
/* init sequence info after device stack is initialized. */
rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
+
+ lu_context_exit(&ctx);
if (rc)
GOTO(err_fini_mgr, rc);
+ lu_context_enter(&ctx);
+ rc = mdt_fld_init(&ctx, m);
+ lu_context_exit(&ctx);
+ if (rc)
+ GOTO(err_free_fld, rc);
+
+ lu_context_fini(&ctx);
+
snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
if (m->mdt_namespace == NULL)
ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
- rc = mdt_fld_init(m);
- if (rc)
- GOTO(err_free_ns, rc);
rc = mdt_start_ptlrpc_service(m);
if (rc)
- GOTO(err_free_fld, rc);
+ GOTO(err_free_ns, rc);
lu_context_exit(&ctx);
lu_context_fini(&ctx);
RETURN(0);
-err_free_fld:
- mdt_fld_fini(m);
err_free_ns:
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
+err_free_fld:
+ mdt_fld_fini(m);
err_fini_mgr:
seq_mgr_fini(m->mdt_seq_mgr);
m->mdt_seq_mgr = NULL;
-err_fini_stack:
- mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
err_fini_ctx:
lu_context_exit(&ctx);
lu_context_fini(&ctx);
+err_fini_stack:
+ mdt_stack_fini(&ctx, m, md2lu_dev(m->mdt_child));
err_fini_site:
lu_site_fini(s);
OBD_FREE_PTR(s);
bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
spin_lock(&s->ls_guard);
o = htable_lookup(s, bucket, f);
+
spin_unlock(&s->ls_guard);
if (o != NULL)
return o;
int lu_object_header_init(struct lu_object_header *h)
{
memset(h, 0, sizeof *h);
+ h->loh_ref = 1;
INIT_HLIST_NODE(&h->loh_hash);
CFS_INIT_LIST_HEAD(&h->loh_lru);
CFS_INIT_LIST_HEAD(&h->loh_layers);
/*
* DT methods.
*/
+static struct dt_object *dt_obj(struct lu_object *o)
+{
+ return container_of0(o, struct dt_object, do_lu);
+}
+
static int osd_root_get(const struct lu_context *ctx,
struct dt_device *dev, struct lu_fid *f)
{
return osd_inode_get_fid(d, osd_sb(d)->s_root->d_inode, f);
}
+struct dt_object *dt_object_find(struct lu_context *ctxt,
+ struct dt_device *d,
+ struct lu_fid *f)
+{
+ struct lu_object *o;
+
+ o = lu_object_find(ctxt, d->dd_lu_dev.ld_site, f);
+ if (IS_ERR(o))
+ return (struct dt_object *)o;
+ else {
+ o = lu_object_locate(o->lo_header, &osd_device_type);
+ LASSERT(lu_device_is_osd(o->lo_dev));
+ return dt_obj(o);
+ }
+}
+EXPORT_SYMBOL(dt_object_find);
/*
* OSD object methods.
result = osd_fid_lookup(ctxt, obj, lu_object_fid(l));
if (result == 0) {
+ /*FIXME: put osd_index_ops here for tmp fix WANGDI*/
+ obj->oo_dt.do_index_ops = &osd_index_ops;
if (obj->oo_inode != NULL)
osd_object_init0(obj);
}
LASSERT(dentry->d_inode != NULL);
obj->oo_inode = dentry->d_inode;
igrab(obj->oo_inode);
+ obj->oo_dt.do_index_ops = &osd_index_ops;
}
dput(dentry);
} else
break;
}
return result;
- }
+}
static int osd_object_create(const struct lu_context *ctx, struct dt_object *dt,
struct lu_attr *attr, struct thandle *th)
static struct dt_body_operations osd_body_ops = {
};
+#if 0
+static int osd_index_delete(struct lu_context *ctxt, struct dt_object *dt,
+ const struct lu_fid *fid, const char *name,
+ struct thandle *th)
+{
+ struct osd_object *osj = osd_dt_obj(dt);
+ struct osd_thandle *oh;
+ struct iam_path_descr ipd;
+ int rc;
+ ENTRY;
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ if (osj->oo_container)
+ rc = iam_delete(oh->ot_handle, osj->oo_container, name, &ipd);
+
+ RETURN(rc);
+}
+#endif
/*
* Index operations.
*/
}
/*
+ * XXX temporary stub stuff for ipd
+ */
+static void ipd_init(struct iam_path_descr *ipd, __u64 *key)
+{
+ int i;
+ for (i = 0; i < DX_SCRATCH_KEYS; i++, key++)
+ ipd->ipd_key_scratch[i] = key;
+}
+
+/*
* XXX This is temporary solution: inode operations are used until iam is
* ready.
*/
static int osd_index_lookup(const struct lu_context *ctxt, struct dt_object *dt,
struct dt_rec *rec, const struct dt_key *key)
{
+#if 1
+ struct osd_object *osj = osd_dt_obj(dt);
+ struct osd_thandle *oh;
+ struct iam_path_descr ipd;
+ __u64 scratch_key[DX_SCRATCH_KEYS];
+ int rc = 0;
+
+ ENTRY;
+
+ ipd_init(&ipd, scratch_key);
+ if (osj->oo_container)
+ rc = iam_lookup(osj->oo_container, key, rec, &ipd);
+
+ RETURN(rc);
+#else
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
result = -ENOMEM;
dput(parent);
return result;
+#endif
}
static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
return result;
}
+
/*
* XXX Temporary stuff.
*/
static int osd_index_insert(const struct lu_context *ctx, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *handle)
+ struct thandle *th)
{
+#if 1
+ struct osd_object *osj = osd_dt_obj(dt);
+ struct osd_thandle *oh;
+ struct iam_path_descr ipd;
+ __u64 scratch_key[DX_SCRATCH_KEYS];
+ int rc = 0;
+
+ ENTRY;
+
+ ipd_init(&ipd, scratch_key);
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ if (osj->oo_container)
+ rc = iam_insert(oh->ot_handle, osj->oo_container, key, rec, &ipd);
+
+ RETURN(rc);
+#else
const struct lu_fid *fid = (const struct lu_fid *)rec;
const char *name = (const char *)key;
} else
result = PTR_ERR(luch);
return result;
+#endif
}
const struct dt_index_features dt_directory_features;
else
return 0; /* nothing yet is supported */
}
+#define osd_fld_name "fld_iam"
+static int osd_index_init(struct lu_context *ctx, struct dt_device *dt,
+ struct dt_object *dt_obj, void *container,
+ void *param)
+{
+ struct osd_object *obj = osd_dt_obj(dt_obj);
+ struct osd_device *osd = osd_obj2dev(obj);
+
+ if (!obj->oo_container) {
+ /*stub for fld_iam*/
+ struct dentry *dentry;
+
+ dentry = osd_open(osd_sb(osd)->s_root, osd_fld_name,
+ S_IFREG);
+ if (IS_ERR(dentry)) {
+ CERROR("can not open %s, rc = %d \n", osd_fld_name,
+ PTR_ERR(dentry));
+ return (PTR_ERR(dentry));
+ }
+ obj->oo_inode = dentry->d_inode;
+ obj->oo_dt.do_index_ops = &osd_index_ops;
+ iam_container_init(container, param, obj->oo_inode);
+ obj->oo_container = container;
+ dput(dentry);
+ }
+
+ return 0;
+}
+
+static int osd_index_fini(struct dt_object *dt_obj)
+{
+ struct osd_object *obj = osd_dt_obj(dt_obj);
+
+ if (obj->oo_container) {
+ iam_container_fini(obj->oo_container);
+ OBD_FREE(obj->oo_container, sizeof(struct iam_container));
+ obj->oo_container = NULL;
+ }
+ return 0;
+}
static struct dt_index_operations osd_index_ops = {
.dio_lookup = osd_index_lookup,
.dio_insert = osd_index_insert,
- .dio_probe = osd_index_probe
+ .dio_probe = osd_index_probe,
+ .dio_init = osd_index_init,
+ .dio_fini = osd_index_fini,
};
/*
*/
struct inode *oo_inode;
struct rw_semaphore oo_sem;
+ void *oo_container;
};
/*
* XXX temporary: for ->i_op calls.
*/
struct qstr oti_str;
-
+ void *scratch_key;
};
#endif /* __KERNEL__ */