static const char dot[] = ".";
static const char dotdot[] = "..";
+enum mdd_txn_op {
+ MDD_TXN_OBJECT_DESTROY_OP,
+ MDD_TXN_OBJECT_CREATE_OP,
+ MDD_TXN_ATTR_SET_OP,
+ MDD_TXN_XATTR_SET_OP,
+ MDD_TXN_INDEX_INSERT_OP,
+ MDD_TXN_INDEX_DELETE_OP,
+ MDD_TXN_LINK_OP,
+ MDD_TXN_UNLINK_OP,
+ MDD_TXN_RENAME_OP,
+ MDD_TXN_CREATE_DATA_OP,
+ MDD_TXN_MKDIR_OP
+};
+
+struct mdd_txn_op_descr {
+ enum mdd_txn_op mod_op;
+ unsigned int mod_credits;
+};
+
+enum {
+ MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
+ MDD_TXN_OBJECT_CREATE_CREDITS = 20,
+ MDD_TXN_ATTR_SET_CREDITS = 20,
+ MDD_TXN_XATTR_SET_CREDITS = 20,
+ MDD_TXN_INDEX_INSERT_CREDITS = 20,
+ MDD_TXN_INDEX_DELETE_CREDITS = 20,
+ MDD_TXN_LINK_CREDITS = 20,
+ MDD_TXN_UNLINK_CREDITS = 20,
+ MDD_TXN_RENAME_CREDITS = 20,
+ MDD_TXN_CREATE_DATA_CREDITS = 20,
+ MDD_TXN_MKDIR_CREDITS = 20
+};
+
+#define DEFINE_MDD_TXN_OP_DESC(opname) \
+static const struct mdd_txn_op_descr opname = { \
+ .mod_op = opname ## _OP, \
+ .mod_credits = opname ## _CREDITS, \
+}
+
+/*
+ * number of blocks to reserve for particular operations. Should be function
+ * of ... something. Stub for now.
+ */
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
+
+static void mdd_txn_param_build(const struct lu_context *ctx,
+ const struct mdd_txn_op_descr *opd)
+{
+ mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
+}
#define mdd_get_group_info(group_info) do { \
atomic_inc(&(group_info)->usage); \
OBD_FREE_PTR(mdd);
}
+static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
+ lu_printer_t p, const struct lu_object *o)
+{
+ return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
+}
+
+/* orphan handling is here */
+static void mdd_object_delete(const struct lu_context *ctxt,
+ struct lu_object *o)
+{
+ struct mdd_object *mdd_obj = lu2mdd_obj(o);
+ struct thandle *handle = NULL;
+ ENTRY;
+
+ if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
+ return;
+
+ if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
+ mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
+ handle = mdd_trans_start(ctxt, lu2mdd_dev(o->lo_dev));
+ if (IS_ERR(handle))
+ CERROR("Cannot get thandle\n");
+ else {
+ mdd_write_lock(ctxt, mdd_obj);
+ /* let's remove obj from the orphan list */
+ __mdd_orphan_del(ctxt, mdd_obj, handle);
+ mdd_write_unlock(ctxt, mdd_obj);
+ mdd_trans_stop(ctxt, lu2mdd_dev(o->lo_dev),
+ 0, handle);
+ }
+ }
+}
+
+static struct lu_object_operations mdd_lu_obj_ops = {
+ .loo_object_init = mdd_object_init,
+ .loo_object_start = mdd_object_start,
+ .loo_object_free = mdd_object_free,
+ .loo_object_print = mdd_object_print,
+ .loo_object_delete = mdd_object_delete
+};
+
struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
struct mdd_device *d,
const struct lu_fid *f)
RETURN(rc);
}
-enum mdd_txn_op {
- MDD_TXN_OBJECT_DESTROY_OP,
- MDD_TXN_OBJECT_CREATE_OP,
- MDD_TXN_ATTR_SET_OP,
- MDD_TXN_XATTR_SET_OP,
- MDD_TXN_INDEX_INSERT_OP,
- MDD_TXN_INDEX_DELETE_OP,
- MDD_TXN_LINK_OP,
- MDD_TXN_UNLINK_OP,
- MDD_TXN_RENAME_OP,
- MDD_TXN_CREATE_DATA_OP,
- MDD_TXN_MKDIR_OP
-};
-
-struct mdd_txn_op_descr {
- enum mdd_txn_op mod_op;
- unsigned int mod_credits;
-};
-
-enum {
- MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
- MDD_TXN_OBJECT_CREATE_CREDITS = 20,
- MDD_TXN_ATTR_SET_CREDITS = 20,
- MDD_TXN_XATTR_SET_CREDITS = 20,
- MDD_TXN_INDEX_INSERT_CREDITS = 20,
- MDD_TXN_INDEX_DELETE_CREDITS = 20,
- MDD_TXN_LINK_CREDITS = 20,
- MDD_TXN_UNLINK_CREDITS = 20,
- MDD_TXN_RENAME_CREDITS = 20,
- MDD_TXN_CREATE_DATA_CREDITS = 20,
- MDD_TXN_MKDIR_CREDITS = 20
-};
-
-#define DEFINE_MDD_TXN_OP_DESC(opname) \
-static const struct mdd_txn_op_descr opname = { \
- .mod_op = opname ## _OP, \
- .mod_credits = opname ## _CREDITS, \
-}
-
-/*
- * number of blocks to reserve for particular operations. Should be function
- * of ... something. Stub for now.
- */
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
-
-static void mdd_txn_param_build(const struct lu_context *ctx,
- const struct mdd_txn_op_descr *opd)
-{
- mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
-}
-
-static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
- lu_printer_t p, const struct lu_object *o)
-{
- return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
-}
-
-static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
-{
- int rc;
- struct dt_object *root;
- ENTRY;
-
- root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
- &mdd->mdd_root_fid);
- if (!IS_ERR(root)) {
- LASSERT(root != NULL);
- lu_object_put(ctx, &root->do_lu);
- rc = orph_index_init(ctx, mdd);
- } else
- rc = PTR_ERR(root);
-
- RETURN(rc);
-}
-
static int mdd_txn_start_cb(const struct lu_context *ctx,
struct txn_param *param, void *cookie)
{
mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
mdd->mdd_txn_cb.dtc_cookie = mdd;
- dt_txn_callback_add(dt, &mdd->mdd_txn_cb);
-
RETURN(rc);
}
{
struct mdd_device *mdd = lu2mdd_dev(d);
struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
+
+ return next;
+}
- dt_txn_callback_del(mdd->mdd_child, &mdd->mdd_txn_cb);
+static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
+{
+ int rc;
+ struct dt_object *root;
+ ENTRY;
- return next;
+ dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
+ root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
+ &mdd->mdd_root_fid);
+ if (!IS_ERR(root)) {
+ LASSERT(root != NULL);
+ lu_object_put(ctx, &root->do_lu);
+ rc = orph_index_init(ctx, mdd);
+ } else
+ rc = PTR_ERR(root);
+
+ RETURN(rc);
}
static void mdd_device_shutdown(const struct lu_context *ctxt,
struct mdd_device *m)
{
+ dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
if (m->mdd_obd_dev)
mdd_fini_obd(ctxt, m);
orph_index_fini(ctxt, m);
obd_notify(obd->u.mds.mds_osc_obd, NULL,
obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
OBD_NOTIFY_SYNC, NULL);
-*/
+ */
LASSERT(mdd);
LASSERT(obd);
obd->obd_recovering = 0;
obd->obd_type->typ_dt_ops->o_postrecov(obd);
/* TODO: orphans handling */
+ __mdd_orphan_cleanup(ctxt, mdd);
rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
RETURN(rc);
.ldo_recovery_complete = mdd_recovery_complete
};
-static struct lu_object_operations mdd_lu_obj_ops = {
- .loo_object_init = mdd_object_init,
- .loo_object_start = mdd_object_start,
- .loo_object_free = mdd_object_free,
- .loo_object_print = mdd_object_print
-};
-
void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
{
struct dt_object *next = mdd_object_child(obj);
/* Return LOV & COOKIES unconditionally here. We clean evth up.
* Caller must be ready for that. */
rc = __mdd_lmm_get(ctxt, obj, ma);
- rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj), obj, ma);
+ if ((ma->ma_valid & MA_LOV))
+ rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
+ obj, ma);
}
RETURN(rc);
}
rc = __mdd_iattr_get(ctxt, obj, ma);
if (rc == 0 && ma->ma_attr.la_nlink == 0) {
- if (obj->mod_count == 0) {
+ /* add new orphan and the object
+ * will be deleted during the object_put() */
+ if (__mdd_orphan_add(ctxt, obj, th) == 0)
+ set_bit(LU_OBJECT_ORPHAN,
+ &mdd2lu_obj(obj)->lo_header->loh_flags);
+
+ if (obj->mod_count == 0)
rc = __mdd_object_kill(ctxt, obj, ma);
- } else {
- /* add new orphan */
- rc = __mdd_orphan_add(ctxt, obj, th);
- }
}
RETURN(rc);
}
struct mdd_object *mdd_obj = md2mdd_obj(obj);
int rc = 0;
- mdd_write_lock(ctxt, md2mdd_obj(obj));
+ mdd_write_lock(ctxt, mdd_obj);
rc = mdd_open_sanity_check(ctxt, mdd_obj, flags, uc);
if (rc == 0)
- md2mdd_obj(obj)->mod_count ++;
+ mdd_obj->mod_count ++;
- mdd_write_unlock(ctxt, md2mdd_obj(obj));
+ mdd_write_unlock(ctxt, mdd_obj);
return rc;
}
struct md_attr *ma, struct md_ucred *uc)
{
int rc;
- struct mdd_object *mdd_obj;
- struct thandle *handle = NULL;
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
ENTRY;
- mdd_obj = md2mdd_obj(obj);
- mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
- handle = mdd_trans_start(ctxt, mdo2mdd(obj));
- if (IS_ERR(handle))
- RETURN(-ENOMEM);
-
mdd_write_lock(ctxt, mdd_obj);
+ /* release open count */
+ mdd_obj->mod_count --;
+
rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
- if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
- if (ma->ma_attr.la_nlink == 0) {
+ if (rc == 0 && mdd_obj->mod_count == 0) {
+ if (ma->ma_attr.la_nlink == 0)
rc = __mdd_object_kill(ctxt, mdd_obj, ma);
- if (rc == 0)
- /* let's remove obj from the orphan list */
- rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
- }
}
mdd_write_unlock(ctxt, mdd_obj);
- mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
RETURN(rc);
}
struct orph_key *key = orph_key_fill(ctx, mdo2fid(obj), op);
int rc;
ENTRY;
-
+ LASSERT(dor);
rc = dor->do_index_ops->dio_delete(ctx, dor,
(struct dt_key *)key, th);
RETURN(rc);
if (mdo->mod_count == 0) {
/* non-opened orphan, let's delete it */
struct md_attr *ma = &mdd_ctx_info(ctx)->mti_ma;
+ CWARN("Found orphan!\n");
__mdd_object_kill(ctx, mdo, ma);
/* TODO: now handle OST objects */
//mdd_ost_objects_destroy(ctx, ma);
{
ENTRY;
if (mdd->mdd_orphans != NULL) {
- if (!IS_ERR(mdd->mdd_orphans))
- lu_object_put(ctx, &mdd->mdd_orphans->do_lu);
+ lu_object_put(ctx, &mdd->mdd_orphans->do_lu);
mdd->mdd_orphans = NULL;
}
EXIT;
}
+int __mdd_orphan_cleanup(const struct lu_context *ctx, struct mdd_device *d)
+{
+ return orph_index_iterate(ctx, d);
+}
+
int __mdd_orphan_add(const struct lu_context *ctx,
- struct mdd_object *obj,
- struct thandle *th)
+ struct mdd_object *obj, struct thandle *th)
{
loff_t offset = 0;
return orph_index_insert(ctx, obj, ORPH_OP_UNLINK, &offset, th);
}
int __mdd_orphan_del(const struct lu_context *ctx,
- struct mdd_object *obj,
- struct thandle *th)
+ struct mdd_object *obj, struct thandle *th)
{
return orph_index_delete(ctx, obj, ORPH_OP_UNLINK, th);
}
+/*
+ * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+/*
+int mdd_objects_destroy(struct mds_obd *mds, struct inode *inode,
+ struct lov_mds_md *lmm, int lmm_size,
+ struct llog_cookie *logcookies,
+ int log_unlink, int async)
+{
+ struct lov_stripe_md *lsm = NULL;
+ struct obd_trans_info oti = { 0 };
+ struct obdo *oa;
+ int rc;
+ ENTRY;
+
+ if (lmm_size == 0)
+ RETURN(0);
+ rc = obd_unpackmd(mds->mds_dt_exp, &lsm, lmm, lmm_size);
+ if (rc < 0) {
+ CERROR("Error unpack md %p\n", lmm);
+ RETURN(rc);
+ } else {
+ LASSERT(rc >= sizeof(*lsm));
+ rc = 0;
+ }
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ GOTO(out_free_memmd, rc = -ENOMEM);
+ oa->o_id = lsm->lsm_object_id;
+ oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_num;
+ oa->o_mode = inode->i_mode & S_IFMT;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
+
+ if (log_unlink && logcookies) {
+ oa->o_valid |= OBD_MD_FLCOOKIE;
+ oti.oti_logcookies = logcookies;
+ }
+
+ CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
+ (int)oa->o_id, (int)oa->o_gr);
+
+ if (async)
+ oti.oti_flags |= OBD_MODE_ASYNC;
+
+ rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
+ obdo_free(oa);
+out_free_memmd:
+ obd_free_memmd(mds->mds_dt_exp, &lsm);
+ RETURN(rc);
+}
+*/