lu_object_init(o, NULL, d);
mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
- atomic_set(&mdd_obj->mod_count, 0);
+ mdd_obj->mod_count = 0;
o->lo_ops = &mdd_lu_obj_ops;
return o;
} else {
struct mdd_object *m;
ENTRY;
- o = lu_object_find(ctxt, d->mdd_md_dev.md_lu_dev.ld_site, f);
+ o = lu_object_find(ctxt, mdd2lu_dev(d)->ld_site, f);
if (IS_ERR(o))
m = (struct mdd_object *)o;
else
m = lu2mdd_obj(lu_object_locate(o->lo_header,
- d->mdd_md_dev.md_lu_dev.ld_type));
+ mdd2lu_dev(d)->ld_type));
RETURN(m);
}
-static inline void mdd_object_put(const struct lu_context *ctxt,
- struct mdd_object *o)
-{
- lu_object_put(ctxt, &o->mod_obj.mo_lu);
-}
-
static inline int mdd_is_immutable(struct mdd_object *obj)
{
return obj->mod_flags & IMMUTE_OBJ;
/* return md_attr back,
* if it is last unlink then return lov ea + llog cookie*/
-static inline int __mdd_object_kill(const struct lu_context *ctxt,
- struct mdd_object *obj,
- struct md_attr *ma)
+int __mdd_object_kill(const struct lu_context *ctxt,
+ struct mdd_object *obj,
+ struct md_attr *ma)
{
int rc = 0;
rc = __mdd_iattr_get(ctxt, obj, ma);
if (rc == 0 && ma->ma_attr.la_nlink == 0) {
- if (atomic_read(&obj->mod_count) == 0) {
+ if (obj->mod_count == 0) {
rc = __mdd_object_kill(ctxt, obj, ma);
} else {
/* add new orphan */
int flags)
{
int mode = accmode(md2mdd_obj(obj), flags);
+
+ mdd_write_lock(ctxt, md2mdd_obj(obj));
if (mode & MAY_WRITE) {
if (mdd_is_immutable(md2mdd_obj(obj)))
RETURN(-EACCES);
}
+
+ md2mdd_obj(obj)->mod_count ++;
- atomic_inc(&md2mdd_obj(obj)->mod_count);
+ mdd_write_unlock(ctxt, md2mdd_obj(obj));
return 0;
}
ENTRY;
mdd_obj = md2mdd_obj(obj);
- mdd_read_lock(ctxt, mdd_obj);
- rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
- if (rc)
- GOTO(out_locked, rc);
+ mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
+ handle = mdd_trans_start(ctxt, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ RETURN(-ENOMEM);
- if (atomic_dec_and_test(&mdd_obj->mod_count)) {
+ mdd_write_lock(ctxt, mdd_obj);
+ rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
+ if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
if (ma->ma_attr.la_nlink == 0) {
rc = __mdd_object_kill(ctxt, mdd_obj, ma);
- if (rc)
- GOTO(out_locked, rc);
- mdd_read_unlock(ctxt, mdd_obj);
- /* let's remove obj from the orphan list */
- mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
- handle = mdd_trans_start(ctxt, mdo2mdd(obj));
- if (IS_ERR(handle))
- GOTO(out, rc = -ENOMEM);
-
- rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
-
- mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
- GOTO(out, rc);
+ if (rc == 0)
+ /* let's remove obj from the orphan list */
+ rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
}
}
-out_locked:
- mdd_read_unlock(ctxt, mdd_obj);
-out:
+ mdd_write_unlock(ctxt, mdd_obj);
+ mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
RETURN(rc);
}
struct mdd_object {
struct md_object mod_obj;
/* open count */
- atomic_t mod_count;
+ __u32 mod_count;
__u32 mod_valid;
unsigned long mod_flags;
};
struct txn_param mti_param;
struct lu_fid mti_fid;
struct lu_attr mti_la;
+ struct md_attr mti_ma;
struct lu_attr mti_la_for_fix;
struct lov_mds_md mti_lmm;
struct obd_info mti_oi;
struct thandle *);
int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd);
void orph_index_fini(const struct lu_context *ctx, struct mdd_device *mdd);
+int __mdd_object_kill(const struct lu_context *, struct mdd_object *,
+ struct md_attr *);
+struct mdd_object *mdd_object_find(const struct lu_context *,
+ struct mdd_device *,
+ const struct lu_fid *);
+static inline void mdd_object_put(const struct lu_context *ctxt,
+ struct mdd_object *o)
+{
+ lu_object_put(ctxt, &o->mod_obj.mo_lu);
+}
extern struct lu_device_operations mdd_lu_ops;
static inline int lu_device_is_mdd(struct lu_device *d)
RETURN(rc);
}
-#if 0
-static int orph_index_iterate(struct lu_server_orph *orph,
- const struct lu_context *ctx,
- seqno_t seq, mdsno_t *mds)
+
+static inline struct orph_key *orph_key_empty(const struct lu_context *ctx,
+ __u32 op)
{
- struct dt_object *dt_obj = orph->orph_obj;
- struct dt_rec *rec = orph_rec(ctx, 0);
- int rc;
+ struct orph_key *key = &mdd_ctx_info(ctx)->mti_orph_key;
+ LASSERT(key);
+ key->ok_fid.f_seq = 0;
+ key->ok_fid.f_oid = 0;
+ key->ok_fid.f_ver = 0;
+ key->ok_op = cpu_to_be32(op);
+ return key;
+}
+
+static void orph_key_test_and_del(const struct lu_context *ctx,
+ struct mdd_device *mdd,
+ const struct orph_key *key)
+{
+ struct mdd_object *mdo;
+
+ mdo = mdd_object_find(ctx, mdd, &key->ok_fid);
+ if (IS_ERR(mdo))
+ CERROR("Invalid orphan!\n");
+ else {
+ if (mdo->mod_count == 0) {
+ /* non-opened orphan, let's delete it */
+ struct md_attr *ma = &mdd_ctx_info(ctx)->mti_ma;
+ __mdd_object_kill(ctx, mdo, ma);
+ /* TODO: now handle OST objects */
+ //mdd_ost_objects_destroy(ctx, ma);
+ /* TODO: destroy index entry */
+ }
+ mdd_object_put(ctx, mdo);
+ }
+}
+
+static int orph_index_iterate(const struct lu_context *ctx,
+ struct mdd_device *mdd)
+{
+ struct dt_object *dt_obj = mdd->mdd_orphans;
+ struct dt_it *it;
+ struct dt_it_ops *iops;
+ struct orph_key *key = orph_key_empty(ctx, 0);
+ int result;
ENTRY;
- rc = dt_obj->do_index_ops->dio_lookup(ctx, dt_obj, rec,
- orph_key(ctx, seq));
- if (rc == 0)
- *mds = be64_to_cpu(*(__u64 *)rec);
- RETURN(rc);
+ iops = &dt_obj->do_index_ops->dio_it;
+ it = iops->init(ctx, dt_obj, 1);
+ if (it != NULL) {
+ result = iops->get(ctx, it, (const void *)key);
+ if (result > 0) {
+ int i;
+ /* main cycle */
+ for (result = 0, i = 0; result == +1; ++i) {
+ key = (void *)iops->key(ctx, it);
+ orph_key_test_and_del(ctx, mdd, key);
+ result = iops->next(ctx, it);
+ }
+ iops->put(ctx, it);
+ } else if (result == 0)
+ /* Index contains no zero key? */
+ result = -EIO;
+ iops->fini(ctx, it);
+ } else
+ result = -ENOMEM;
+
+ RETURN(result);
}
-#endif
+
int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd)
{
struct lu_fid fid;