static const struct dt_object_operations osd_obj_ops;
static const struct dt_object_operations osd_obj_ea_ops;
static const struct dt_body_operations osd_body_ops;
+static const struct dt_body_operations osd_body_ops_new;
static const struct dt_index_operations osd_index_iam_ops;
static const struct dt_index_operations osd_index_ea_ops;
+#define OSD_TRACK_DECLARES
+#ifdef OSD_TRACK_DECLARES
+#define OSD_DECLARE_OP(oh, op) { \
+ LASSERT(oh->ot_handle == NULL); \
+ ((oh)->ot_declare_ ##op)++; }
+#define OSD_EXEC_OP(handle, op) { \
+ struct osd_thandle *oh; \
+ oh = container_of0(handle, struct osd_thandle, ot_super);\
+ if (((oh)->ot_declare_ ##op) > 0) { \
+ ((oh)->ot_declare_ ##op)--; \
+ } \
+ }
+#else
+#define OSD_DECLARE_OP(oh, op)
+#define OSD_EXEC_OP(oh, op)
+#endif
+
+/* There are at most 10 uid/gids are affected in a transaction, and
+ * that's rename case:
+ * - 2 for source parent uid & gid;
+ * - 2 for source child uid & gid ('..' entry update when the child
+ * is directory);
+ * - 2 for target parent uid & gid;
+ * - 2 for target child uid & gid (if the target child exists);
+ * - 2 for root uid & gid (last_rcvd, llog, etc);
+ *
+ * The 0 to (OSD_MAX_UGID_CNT - 1) bits of ot_id_type is for indicating
+ * the id type of each id in the ot_id_array.
+ */
+#define OSD_MAX_UGID_CNT 10
+
struct osd_thandle {
struct thandle ot_super;
handle_t *ot_handle;
cfs_list_t ot_dcb_list;
/* Link to the device, for debugging. */
struct lu_ref_link *ot_dev_link;
+ unsigned short ot_credits;
+ unsigned short ot_id_cnt;
+ unsigned short ot_id_type;
+ uid_t ot_id_array[OSD_MAX_UGID_CNT];
+
+#ifdef OSD_TRACK_DECLARES
+ unsigned char ot_declare_attr_set;
+ unsigned char ot_declare_punch;
+ unsigned char ot_declare_xattr_set;
+ unsigned char ot_declare_create;
+ unsigned char ot_declare_destroy;
+ unsigned char ot_declare_ref_add;
+ unsigned char ot_declare_ref_del;
+ unsigned char ot_declare_write;
+ unsigned char ot_declare_insert;
+ unsigned char ot_declare_delete;
+#endif
#if OSD_THANDLE_STATS
/** time when this handle was allocated */
#endif
};
+/**
+ * Basic transaction credit op
+ */
+enum dt_txn_op {
+ DTO_INDEX_INSERT,
+ DTO_INDEX_DELETE,
+ DTO_INDEX_UPDATE,
+ DTO_OBJECT_CREATE,
+ DTO_OBJECT_DELETE,
+ DTO_ATTR_SET_BASE,
+ DTO_XATTR_SET,
+ DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
+ DTO_WRITE_BASE,
+ DTO_WRITE_BLOCK,
+ DTO_ATTR_SET_CHOWN,
+
+ DTO_NR
+};
+
/*
* Helpers.
*/
return 0;
}
+static inline int osd_qid_type(struct osd_thandle *oh, int i)
+{
+ return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
+}
+
+static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
+{
+ oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
+}
+
+static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+ int type, uid_t id, struct inode *inode)
+{
+#ifdef CONFIG_QUOTA
+ int i, allocated = 0;
+ struct osd_object *obj;
+
+ LASSERT(dt != NULL);
+ LASSERT(oh != NULL);
+ LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u",
+ oh->ot_id_cnt);
+
+ /* id entry is allocated in the quota file */
+ if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off)
+ allocated = 1;
+
+ for (i = 0; i < oh->ot_id_cnt; i++) {
+ if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type)
+ return;
+ }
+
+ if (unlikely(i >= OSD_MAX_UGID_CNT)) {
+ CERROR("more than %d uid/gids for a transaction?\n", i);
+ return;
+ }
+
+ oh->ot_id_array[i] = id;
+ osd_qid_set_type(oh, i, type);
+ oh->ot_id_cnt++;
+ obj = osd_dt_obj(dt);
+ oh->ot_credits += (allocated || id == 0) ?
+ 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj)));
+#endif
+}
+
/*
* OSD object methods.
*/
struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
struct osd_device *dev;
struct osd_inode_id *id;
- struct osd_oi *oi;
struct inode *inode;
int result;
LINVRNT(osd_invariant(obj));
LASSERT(obj->oo_inode == NULL);
- LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
+ LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
/*
* This assertion checks that osd layer sees only local
* fids. Unfortunately it is somewhat expensive (does a
info = osd_oti_get(env);
dev = osd_dev(ldev);
id = &info->oti_id;
- oi = &dev->od_oi;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
RETURN(-ENOENT);
- result = osd_oi_lookup(info, oi, fid, id);
+ result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
if (result != 0) {
if (result == -ENOENT)
result = 0;
LINVRNT(osd_invariant(obj));
result = osd_fid_lookup(env, obj, lu_object_fid(l));
+ obj->oo_dt.do_body_ops = &osd_body_ops_new;
if (result == 0) {
if (obj->oo_inode != NULL)
osd_object_init0(obj);
* Concurrency: doesn't access mutable data.
*/
static int osd_param_is_sane(const struct osd_device *dev,
- const struct txn_param *param)
+ const struct thandle *th)
{
- return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+ struct osd_thandle *oh;
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
}
/*
OBD_FREE_PTR(oh);
}
+static struct thandle *osd_trans_create(const struct lu_env *env,
+ struct dt_device *d)
+{
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_thandle *oh;
+ struct thandle *th;
+ ENTRY;
+
+ th = ERR_PTR(-ENOMEM);
+ OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
+ if (oh != NULL) {
+ th = &oh->ot_super;
+ th->th_dev = d;
+ th->th_result = 0;
+ th->th_tags = LCT_TX_HANDLE;
+ oh->ot_credits = 0;
+ oti->oti_dev = osd_dt_dev(d);
+ CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
+ osd_th_alloced(oh);
+ }
+ RETURN(th);
+}
+
/*
* Concurrency: shouldn't matter.
*/
-static struct thandle *osd_trans_start(const struct lu_env *env,
- struct dt_device *d,
- struct txn_param *p)
+int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+ struct thandle *th)
{
+ struct osd_thread_info *oti = osd_oti_get(env);
struct osd_device *dev = osd_dt_dev(d);
handle_t *jh;
struct osd_thandle *oh;
- struct thandle *th;
- int hook_res;
+ int rc;
ENTRY;
- hook_res = dt_txn_hook_start(env, d, p);
- if (hook_res != 0)
- RETURN(ERR_PTR(hook_res));
+ LASSERT(current->journal_info == NULL);
- if (osd_param_is_sane(dev, p)) {
- OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
- if (oh != NULL) {
- struct osd_thread_info *oti = osd_oti_get(env);
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh != NULL);
+ LASSERT(oh->ot_handle == NULL);
- /*
- * XXX temporary stuff. Some abstraction layer should
- * be used.
- */
- oti->oti_dev = dev;
- CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
- osd_th_alloced(oh);
- jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
- osd_th_started(oh);
- if (!IS_ERR(jh)) {
- oh->ot_handle = jh;
- th = &oh->ot_super;
- th->th_dev = d;
- th->th_result = 0;
- th->th_sync = 0;
- lu_device_get(&d->dd_lu_dev);
- oh->ot_dev_link = lu_ref_add
- (&d->dd_lu_dev.ld_reference,
- "osd-tx", th);
- /* add commit callback */
- lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
- lu_context_enter(&th->th_ctx);
- LASSERT(oti->oti_txns == 0);
- LASSERT(oti->oti_r_locks == 0);
- LASSERT(oti->oti_w_locks == 0);
- oti->oti_txns++;
- } else {
- OBD_FREE_PTR(oh);
- th = (void *)jh;
- }
- } else
- th = ERR_PTR(-ENOMEM);
- } else {
- CERROR("Invalid transaction parameters\n");
- th = ERR_PTR(-EINVAL);
+ rc = dt_txn_hook_start(env, d, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (!osd_param_is_sane(dev, th)) {
+ CWARN("%s: too many transaction credits (%d > %d)\n",
+ d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
+ osd_journal(dev)->j_max_transaction_buffers);
+ /* XXX Limit the credits to 'max_transaction_buffers', and
+ * let the underlying filesystem to catch the error if
+ * we really need so many credits.
+ *
+ * This should be removed when we can calculate the
+ * credits precisely. */
+ oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
+#ifdef OSD_TRACK_DECLARES
+ CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n",
+ oh->ot_declare_attr_set, oh->ot_declare_punch,
+ oh->ot_declare_xattr_set);
+ CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n",
+ oh->ot_declare_create, oh->ot_declare_ref_add,
+ oh->ot_declare_ref_del, oh->ot_declare_write);
+ CERROR(" insert: %d, delete: %d, destroy: %d\n",
+ oh->ot_declare_insert, oh->ot_declare_delete,
+ oh->ot_declare_destroy);
+#endif
}
- RETURN(th);
+ /*
+ * XXX temporary stuff. Some abstraction layer should
+ * be used.
+ */
+ jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
+ osd_th_started(oh);
+ if (!IS_ERR(jh)) {
+ oh->ot_handle = jh;
+ LASSERT(oti->oti_txns == 0);
+ lu_context_init(&th->th_ctx, th->th_tags);
+ lu_context_enter(&th->th_ctx);
+
+ lu_device_get(&d->dd_lu_dev);
+ oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
+ "osd-tx", th);
+
+ /*
+ * XXX: current rule is that we first start tx,
+ * then lock object(s), but we can't use
+ * this rule for data (due to locking specifics
+ * in ldiskfs). also in long-term we'd like to
+ * use usually-used (locks;tx) ordering. so,
+ * UGLY thing is that we'll use one ordering for
+ * data (ofd) and reverse ordering for metadata
+ * (mdd). then at some point we'll fix the latter
+ */
+ if (lu_device_is_md(&d->dd_lu_dev)) {
+ LASSERT(oti->oti_r_locks == 0);
+ LASSERT(oti->oti_w_locks == 0);
+ }
+
+ oti->oti_txns++;
+ rc = 0;
+ } else {
+ rc = PTR_ERR(jh);
+ }
+out:
+ RETURN(rc);
}
/*
* Concurrency: shouldn't matter.
*/
-static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
{
- int result;
- struct osd_thandle *oh;
+ int rc = 0;
+ struct osd_thandle *oh;
struct osd_thread_info *oti = osd_oti_get(env);
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
+
if (oh->ot_handle != NULL) {
handle_t *hdl = oh->ot_handle;
hdl->h_sync = th->th_sync;
+
/*
* add commit callback
* notice we don't do this in osd_trans_start()
LASSERT(oti->oti_txns == 1);
oti->oti_txns--;
- LASSERT(oti->oti_r_locks == 0);
- LASSERT(oti->oti_w_locks == 0);
- result = dt_txn_hook_stop(env, th);
- if (result != 0)
- CERROR("Failure in transaction hook: %d\n", result);
+ /*
+ * XXX: current rule is that we first start tx,
+ * then lock object(s), but we can't use
+ * this rule for data (due to locking specifics
+ * in ldiskfs). also in long-term we'd like to
+ * use usually-used (locks;tx) ordering. so,
+ * UGLY thing is that we'll use one ordering for
+ * data (ofd) and reverse ordering for metadata
+ * (mdd). then at some point we'll fix the latter
+ */
+ if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
+ LASSERT(oti->oti_r_locks == 0);
+ LASSERT(oti->oti_w_locks == 0);
+ }
+ rc = dt_txn_hook_stop(env, th);
+ if (rc != 0)
+ CERROR("Failure in transaction hook: %d\n", rc);
oh->ot_handle = NULL;
OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
- result = ldiskfs_journal_stop(hdl));
- if (result != 0)
- CERROR("Failure to stop transaction: %d\n", result);
+ rc = ldiskfs_journal_stop(hdl));
+ if (rc != 0)
+ CERROR("Failure to stop transaction: %d\n", rc);
} else {
OBD_FREE_PTR(oh);
}
- EXIT;
+
+ RETURN(rc);
}
static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
}
/*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle.
- */
-static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
-{
- const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
- struct osd_device *osd = osd_obj2dev(obj);
- struct osd_thread_info *oti = osd_oti_get(env);
- struct txn_param *prm = &oti->oti_txn;
- struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env;
- struct thandle *th;
- int result;
-
- lu_env_init(env_del_obj, LCT_DT_THREAD);
- txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
- OSD_TXN_INODE_DELETE_CREDITS);
- th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
- if (!IS_ERR(th)) {
- result = osd_oi_delete(osd_oti_get(env_del_obj),
- &osd->od_oi, fid, th);
- osd_trans_stop(env_del_obj, th);
- } else
- result = PTR_ERR(th);
-
- lu_env_fini(env_del_obj);
- return result;
-}
-
-/*
* Called just before object is freed. Releases all resources except for
* object itself (that is released by osd_object_free()).
*
osd_index_fini(obj);
if (inode != NULL) {
- int result;
-
- if (osd_inode_unlinked(inode)) {
- result = osd_inode_remove(env, obj);
- if (result != 0)
- LU_OBJECT_DEBUG(D_ERROR, env, l,
- "Failed to cleanup: %d\n",
- result);
- }
-
iput(inode);
obj->oo_inode = NULL;
}
static void osd_object_release(const struct lu_env *env,
struct lu_object *l)
{
- struct osd_object *o = osd_obj(l);
-
- LASSERT(!lu_object_is_dying(l->lo_header));
- if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
- cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
}
/*
param->ddp_mntopts |= MNTOPT_USERXATTR;
if (test_opt(sb, POSIX_ACL))
param->ddp_mntopts |= MNTOPT_ACL;
+
+#if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
+ if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
+ param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
+ else
+#endif
+ param->ddp_max_ea_size = sb->s_blocksize;
+
}
/**
/*
* Concurrency: shouldn't matter.
*/
-lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
static void osd_ro(const struct lu_env *env, struct dt_device *d)
{
+ struct super_block *sb = osd_sb(osd_dt_dev(d));
ENTRY;
CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
- __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
- fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
+ __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
EXIT;
}
-
/*
* Concurrency: serialization provided by callers.
*/
/**
* Unused now
*/
- [DTO_IDNEX_UPDATE] = 16,
+ [DTO_INDEX_UPDATE] = 16,
/**
* Create a object. The same as create object in EXT3.
* DATA_TRANS_BLOCKS(14) +
*/
[DTO_OBJECT_CREATE] = 25,
/**
- * Unused now
+ * XXX: real credits to be fixed
*/
[DTO_OBJECT_DELETE] = 25,
/**
- * Attr set credits.
- * 3(inode bits, group, GDT)
+ * Attr set credits (inode)
*/
- [DTO_ATTR_SET_BASE] = 3,
+ [DTO_ATTR_SET_BASE] = 1,
/**
* Xattr set. The same as xattr of EXT3.
* DATA_TRANS_BLOCKS(14)
[DTO_XATTR_SET] = 14,
[DTO_LOG_REC] = 14,
/**
- * creadits for inode change during write.
+ * credits for inode change during write.
*/
[DTO_WRITE_BASE] = 3,
/**
[DTO_ATTR_SET_CHOWN]= 0
};
-/**
- * Note: we count into QUOTA here.
- * If we mount with --data_journal we may need more.
- */
-static const int osd_dto_credits_quota[DTO_NR] = {
- /**
- * INDEX_EXTRA_TRANS_BLOCKS(8) +
- * SINGLEDATA_TRANS_BLOCKS(8) +
- * 2 * QUOTA_TRANS_BLOCKS(2)
- */
- [DTO_INDEX_INSERT] = 20,
- /**
- * INDEX_EXTRA_TRANS_BLOCKS(8) +
- * SINGLEDATA_TRANS_BLOCKS(8) +
- * 2 * QUOTA_TRANS_BLOCKS(2)
- */
- [DTO_INDEX_DELETE] = 20,
- /**
- * Unused now.
- */
- [DTO_IDNEX_UPDATE] = 16,
- /*
- * Create a object. Same as create object in EXT3 filesystem.
- * DATA_TRANS_BLOCKS(16) +
- * INDEX_EXTRA_BLOCKS(8) +
- * 3(inode bits, groups, GDT) +
- * 2 * QUOTA_INIT_BLOCKS(25)
- */
- [DTO_OBJECT_CREATE] = 77,
- /*
- * Unused now.
- * DATA_TRANS_BLOCKS(16) +
- * INDEX_EXTRA_BLOCKS(8) +
- * 3(inode bits, groups, GDT) +
- * QUOTA(?)
- */
- [DTO_OBJECT_DELETE] = 27,
- /**
- * Attr set credits.
- * 3 (inode bit, group, GDT) +
- */
- [DTO_ATTR_SET_BASE] = 3,
- /**
- * Xattr set. The same as xattr of EXT3.
- * DATA_TRANS_BLOCKS(16)
- * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
- * also counted in. Do not know why?
- */
- [DTO_XATTR_SET] = 16,
- [DTO_LOG_REC] = 16,
- /**
- * creadits for inode change during write.
- */
- [DTO_WRITE_BASE] = 3,
- /**
- * credits for single block write.
- */
- [DTO_WRITE_BLOCK] = 16,
- /**
- * Attr set credits for chown.
- * It is added to already set setattr credits
- * 2 * QUOTA_INIT_BLOCKS(25) +
- * 2 * QUOTA_DEL_BLOCKS(9)
- */
- [DTO_ATTR_SET_CHOWN]= 68,
-};
-
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
- enum dt_txn_op op)
-{
- LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
- ARRAY_SIZE(osd_dto_credits_quota));
- LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
-#ifdef HAVE_QUOTA_SUPPORT
- if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
- return osd_dto_credits_quota[op];
- else
-#endif
- return osd_dto_credits_noquota[op];
-}
-
static const struct dt_device_operations osd_dt_ops = {
.dt_root_get = osd_root_get,
.dt_statfs = osd_statfs,
+ .dt_trans_create = osd_trans_create,
.dt_trans_start = osd_trans_start,
.dt_trans_stop = osd_trans_stop,
.dt_trans_cb_add = osd_trans_cb_add,
.dt_sync = osd_sync,
.dt_ro = osd_ro,
.dt_commit_async = osd_commit_async,
- .dt_credit_get = osd_credit_get,
.dt_init_capa_ctxt = osd_init_capa_ctxt,
.dt_init_quota_ctxt= osd_init_quota_ctxt,
};
return 0;
}
+static int osd_declare_attr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *attr,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+ struct osd_object *obj;
+
+ LASSERT(dt != NULL);
+ LASSERT(handle != NULL);
+
+ obj = osd_dt_obj(dt);
+ LASSERT(osd_invariant(obj));
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, attr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ if (attr && attr->la_valid & LA_UID) {
+ if (obj->oo_inode)
+ osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid,
+ obj->oo_inode);
+ osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
+ }
+ if (attr && attr->la_valid & LA_GID) {
+ if (obj->oo_inode)
+ osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid,
+ obj->oo_inode);
+ osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
+ }
+
+ return 0;
+}
+
static int osd_inode_setattr(const struct lu_env *env,
struct inode *inode, const struct lu_attr *attr)
{
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, attr_set);
+
cfs_spin_lock(&obj->oo_guard);
rc = osd_inode_setattr(env, obj->oo_inode, attr);
cfs_spin_unlock(&obj->oo_guard);
id->oii_ino = obj->oo_inode->i_ino;
id->oii_gen = obj->oo_inode->i_generation;
- return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+ return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
}
+static int osd_declare_object_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, create);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+ /* XXX: So far, only normal fid needs be inserted into the oi,
+ * things could be changed later. Revise following code then. */
+ if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+ }
+ /* If this is directory, then we expect . and .. to be inserted as
+ * well. The one directory block always needs to be created for the
+ * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
+ * block), there is no danger of needing a tree for the first block.
+ */
+ if (attr && S_ISDIR(attr->la_mode)) {
+ OSD_DECLARE_OP(oh, insert);
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE];
+ }
+
+ if (attr) {
+ osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
+ osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
+ }
+ return 0;
+}
+
static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, create);
+
result = __osd_object_create(info, obj, attr, hint, dof, th);
if (result == 0)
result = __osd_oi_insert(env, obj, fid, th);
}
/**
+ * Called to destroy on-disk representation of the object
+ *
+ * Concurrency: must be locked
+ */
+static int osd_declare_object_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *inode = obj->oo_inode;
+ struct osd_thandle *oh;
+ ENTRY;
+
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+ LASSERT(inode);
+
+ OSD_DECLARE_OP(oh, destroy);
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode);
+ osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode);
+
+ RETURN(0);
+}
+
+static int osd_object_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *inode = obj->oo_inode;
+ struct osd_device *osd = osd_obj2dev(obj);
+ struct osd_thandle *oh;
+ int result;
+ ENTRY;
+
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle);
+ LASSERT(inode);
+ LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
+
+ if (S_ISDIR(inode->i_mode)) {
+ LASSERT(osd_inode_unlinked(inode) ||
+ inode->i_nlink == 1);
+ cfs_spin_lock(&obj->oo_guard);
+ inode->i_nlink = 0;
+ cfs_spin_unlock(&obj->oo_guard);
+ inode->i_sb->s_op->dirty_inode(inode);
+ } else {
+ LASSERT(osd_inode_unlinked(inode));
+ }
+
+ OSD_EXEC_OP(th, destroy);
+
+ result = osd_oi_delete(osd_oti_get(env),
+ osd_fid2oi(osd, fid), fid, th);
+
+ /* XXX: add to ext3 orphan list */
+ /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
+
+ /* not needed in the cache anymore */
+ set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+ RETURN(0);
+}
+
+/**
* Helper function for osd_xattr_set()
*/
static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, create);
+
result = __osd_object_create(info, obj, attr, hint, dof, th);
/* objects under osd root shld have igif fid, so dont add fid EA */
RETURN(result);
}
+static int osd_declare_object_ref_add(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ /* it's possible that object doesn't exist yet */
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, ref_add);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ return 0;
+}
+
/*
* Concurrency: @dt is write locked.
*/
-static void osd_object_ref_add(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+static int osd_object_ref_add(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, ref_add);
+
+ /*
+ * DIR_NLINK feature is set for compatibility reasons if:
+ * 1) nlinks > LDISKFS_LINK_MAX, or
+ * 2) nlinks == 2, since this indicates i_nlink was previously 1.
+ *
+ * It is easier to always set this flag (rather than check and set),
+ * since it has less overhead, and the superblock will be dirtied
+ * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
+ * do not actually care whether this flag is set or not.
+ */
cfs_spin_lock(&obj->oo_guard);
- LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
inode->i_nlink++;
+ if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
+ if (inode->i_nlink >= LDISKFS_LINK_MAX ||
+ inode->i_nlink == 2)
+ inode->i_nlink = 1;
+ }
+ LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
cfs_spin_unlock(&obj->oo_guard);
inode->i_sb->s_op->dirty_inode(inode);
LINVRNT(osd_invariant(obj));
+
+ return 0;
+}
+
+static int osd_declare_object_ref_del(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, ref_del);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ return 0;
}
/*
* Concurrency: @dt is write locked.
*/
-static void osd_object_ref_del(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+static int osd_object_ref_del(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, ref_del);
+
cfs_spin_lock(&obj->oo_guard);
LASSERT(inode->i_nlink > 0);
inode->i_nlink--;
+ /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
+ * then the nlink count is 1. Don't let it be set to 0 or the directory
+ * inode will be deleted incorrectly. */
+ if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
+ inode->i_nlink++;
cfs_spin_unlock(&obj->oo_guard);
inode->i_sb->s_op->dirty_inode(inode);
LINVRNT(osd_invariant(obj));
+
+ return 0;
+}
+
+/*
+ * Get the 64-bit version for an inode.
+ */
+static int osd_object_version_get(const struct lu_env *env,
+ struct dt_object *dt, dt_obj_version_t *ver)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
+ LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ *ver = LDISKFS_I(inode)->i_fs_version;
+ return 0;
}
/*
struct osd_thread_info *info = osd_oti_get(env);
struct dentry *dentry = &info->oti_obj_dentry;
+ /* version get is not real XATTR but uses xattr API */
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* for version we are just using xattr API but change inode
+ * field instead */
+ LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+ osd_object_version_get(env, dt, buf->lb_buf);
+ return sizeof(dt_obj_version_t);
+ }
+
LASSERT(dt_object_exists(dt));
LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
}
+
+static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int fl, struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* no credits for version */
+ return 0;
+ }
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, xattr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+ return 0;
+}
+
+/*
+ * Set the 64-bit version for object
+ */
+static void osd_object_version_set(const struct lu_env *env,
+ struct dt_object *dt,
+ dt_obj_version_t *new_version)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+ *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+
+ LDISKFS_I(inode)->i_fs_version = *new_version;
+ /** Version is set after all inode operations are finished,
+ * so we should mark it dirty here */
+ inode->i_sb->s_op->dirty_inode(inode);
+}
+
/*
* Concurrency: @dt is write locked.
*/
{
LASSERT(handle != NULL);
+ /* version set is not real XATTR */
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* for version we are just using xattr API but change inode
+ * field instead */
+ LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+ osd_object_version_set(env, dt, buf->lb_buf);
+ return sizeof(dt_obj_version_t);
+ }
+
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, xattr_set);
return __osd_xattr_set(env, dt, buf, name, fl);
}
return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
}
+static int osd_declare_xattr_del(const struct lu_env *env,
+ struct dt_object *dt,
+ const char *name,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, xattr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+ return 0;
+}
+
/*
* Concurrency: @dt is write locked.
*/
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, xattr_set);
+
dentry->d_inode = inode;
rc = inode->i_op->removexattr(dentry, name);
return rc;
RETURN(rc);
}
-/*
- * Get the 64-bit version for an inode.
- */
-static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
- struct dt_object *dt)
-{
- struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
- CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
- LDISKFS_I(inode)->i_fs_version, inode->i_ino);
- return LDISKFS_I(inode)->i_fs_version;
-}
-
-/*
- * Set the 64-bit version and return the old version.
- */
-static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
- dt_obj_version_t new_version)
-{
- struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
- CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
- new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
- LDISKFS_I(inode)->i_fs_version = new_version;
- /** Version is set after all inode operations are finished,
- * so we should mark it dirty here */
- inode->i_sb->s_op->dirty_inode(inode);
-}
-
static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
void **data)
{
}
static const struct dt_object_operations osd_obj_ops = {
- .do_read_lock = osd_object_read_lock,
- .do_write_lock = osd_object_write_lock,
- .do_read_unlock = osd_object_read_unlock,
- .do_write_unlock = osd_object_write_unlock,
- .do_write_locked = osd_object_write_locked,
- .do_attr_get = osd_attr_get,
- .do_attr_set = osd_attr_set,
- .do_ah_init = osd_ah_init,
- .do_create = osd_object_create,
- .do_index_try = osd_index_try,
- .do_ref_add = osd_object_ref_add,
- .do_ref_del = osd_object_ref_del,
- .do_xattr_get = osd_xattr_get,
- .do_xattr_set = osd_xattr_set,
- .do_xattr_del = osd_xattr_del,
- .do_xattr_list = osd_xattr_list,
- .do_capa_get = osd_capa_get,
- .do_object_sync = osd_object_sync,
- .do_version_get = osd_object_version_get,
- .do_version_set = osd_object_version_set,
- .do_data_get = osd_data_get,
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_declare_attr_set = osd_declare_attr_set,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_declare_create = osd_declare_object_create,
+ .do_create = osd_object_create,
+ .do_declare_destroy = osd_declare_object_destroy,
+ .do_destroy = osd_object_destroy,
+ .do_index_try = osd_index_try,
+ .do_declare_ref_add = osd_declare_object_ref_add,
+ .do_ref_add = osd_object_ref_add,
+ .do_declare_ref_del = osd_declare_object_ref_del,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_declare_xattr_set = osd_declare_xattr_set,
+ .do_xattr_set = osd_xattr_set,
+ .do_declare_xattr_del = osd_declare_xattr_del,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_data_get = osd_data_get,
};
/**
* (i.e. to run 2.0 mds on 1.8 disk) (b11826)
*/
static const struct dt_object_operations osd_obj_ea_ops = {
- .do_read_lock = osd_object_read_lock,
- .do_write_lock = osd_object_write_lock,
- .do_read_unlock = osd_object_read_unlock,
- .do_write_unlock = osd_object_write_unlock,
- .do_write_locked = osd_object_write_locked,
- .do_attr_get = osd_attr_get,
- .do_attr_set = osd_attr_set,
- .do_ah_init = osd_ah_init,
- .do_create = osd_object_ea_create,
- .do_index_try = osd_index_try,
- .do_ref_add = osd_object_ref_add,
- .do_ref_del = osd_object_ref_del,
- .do_xattr_get = osd_xattr_get,
- .do_xattr_set = osd_xattr_set,
- .do_xattr_del = osd_xattr_del,
- .do_xattr_list = osd_xattr_list,
- .do_capa_get = osd_capa_get,
- .do_object_sync = osd_object_sync,
- .do_version_get = osd_object_version_get,
- .do_version_set = osd_object_version_set,
- .do_data_get = osd_data_get,
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_declare_attr_set = osd_declare_attr_set,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_declare_create = osd_declare_object_create,
+ .do_create = osd_object_ea_create,
+ .do_declare_destroy = osd_declare_object_destroy,
+ .do_destroy = osd_object_destroy,
+ .do_index_try = osd_index_try,
+ .do_declare_ref_add = osd_declare_object_ref_add,
+ .do_ref_add = osd_object_ref_add,
+ .do_declare_ref_del = osd_declare_object_ref_del,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_declare_xattr_set = osd_declare_xattr_set,
+ .do_xattr_set = osd_xattr_set,
+ .do_declare_xattr_del = osd_declare_xattr_del,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_data_get = osd_data_get,
};
/*
return err;
}
+static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
+ const loff_t size, loff_t pos,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, write);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+
+ if (osd_dt_obj(dt)->oo_inode == NULL)
+ return 0;
+
+ osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+ osd_dt_obj(dt)->oo_inode);
+ osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+ osd_dt_obj(dt)->oo_inode);
+ return 0;
+}
+
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
struct thandle *handle, struct lustre_capa *capa,
return result;
}
+/*
+ * in some cases we may need declare methods for objects being created
+ * e.g., when we create symlink
+ */
+static const struct dt_body_operations osd_body_ops_new = {
+ .dbo_declare_write = osd_declare_write,
+};
+
static const struct dt_body_operations osd_body_ops = {
- .dbo_read = osd_read,
- .dbo_write = osd_write
+ .dbo_read = osd_read,
+ .dbo_declare_write = osd_declare_write,
+ .dbo_write = osd_write
};
+static int osd_index_declare_iam_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ return 0;
+}
/**
* delete a (key, value) pair from index \a dt specified by \a key
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
RETURN(-EACCES);
+ OSD_EXEC_OP(handle, delete);
+
ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
RETURN(rc);
}
+static int osd_index_declare_ea_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ LASSERT(osd_dt_obj(dt)->oo_inode);
+ osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+ osd_dt_obj(dt)->oo_inode);
+ osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+ osd_dt_obj(dt)->oo_inode);
+
+ return 0;
+}
+
static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
struct dt_rec *fid)
{
LASSERT(dt_object_exists(dt));
LASSERT(handle != NULL);
+ OSD_EXEC_OP(handle, delete);
+
oh = container_of(handle, struct osd_thandle, ot_super);
LASSERT(oh->ot_handle != NULL);
LASSERT(oh->ot_handle->h_transaction != NULL);
RETURN(rc);
}
+static int osd_index_declare_iam_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+ return 0;
+}
+
/**
* Inserts (key, value) pair in \a dt index object.
*
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
return -EACCES;
+ OSD_EXEC_OP(th, insert);
+
ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
lu_object_put(env, &obj->oo_dt.do_lu);
}
+static int osd_index_declare_ea_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+ LASSERT(osd_dt_obj(dt)->oo_inode);
+ osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+ osd_dt_obj(dt)->oo_inode);
+ osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+ osd_dt_obj(dt)->oo_inode);
+
+ return 0;
+}
+
/**
* Index add function for interoperability mode (b11826).
* It will add the directory entry.This entry is needed to
*/
static int osd_it_iam_rec(const struct lu_env *env,
const struct dt_it *di,
- struct lu_dirent *lde,
+ struct dt_rec *dtrec,
__u32 attr)
{
struct osd_it_iam *it = (struct osd_it_iam *)di;
struct osd_thread_info *info = osd_oti_get(env);
struct lu_fid *fid = &info->oti_fid;
const struct osd_fid_pack *rec;
+ struct lu_dirent *lde = (struct lu_dirent *)dtrec;
char *name;
int namelen;
__u64 hash;
}
static const struct dt_index_operations osd_index_iam_ops = {
- .dio_lookup = osd_index_iam_lookup,
- .dio_insert = osd_index_iam_insert,
- .dio_delete = osd_index_iam_delete,
+ .dio_lookup = osd_index_iam_lookup,
+ .dio_declare_insert = osd_index_declare_iam_insert,
+ .dio_insert = osd_index_iam_insert,
+ .dio_declare_delete = osd_index_declare_iam_delete,
+ .dio_delete = osd_index_iam_delete,
.dio_it = {
.init = osd_it_iam_init,
.fini = osd_it_iam_fini,
*/
static inline int osd_it_ea_rec(const struct lu_env *env,
const struct dt_it *di,
- struct lu_dirent *lde,
+ struct dt_rec *dtrec,
__u32 attr)
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
struct osd_object *obj = it->oie_obj;
struct lu_fid *fid = &it->oie_dirent->oied_fid;
+ struct lu_dirent *lde = (struct lu_dirent *)dtrec;
int rc = 0;
ENTRY;
* mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
*/
static const struct dt_index_operations osd_index_ea_ops = {
- .dio_lookup = osd_index_ea_lookup,
- .dio_insert = osd_index_ea_insert,
- .dio_delete = osd_index_ea_delete,
+ .dio_lookup = osd_index_ea_lookup,
+ .dio_declare_insert = osd_index_declare_ea_insert,
+ .dio_insert = osd_index_ea_insert,
+ .dio_declare_delete = osd_index_declare_ea_delete,
+ .dio_delete = osd_index_ea_delete,
.dio_it = {
.init = osd_it_ea_init,
.fini = osd_it_ea_fini,
lu_object_put(env, &o->od_obj_area->do_lu);
o->od_obj_area = NULL;
}
- osd_oi_fini(info, &o->od_oi);
+ if (o->od_oi_table != NULL)
+ osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
RETURN(0);
}
ENTRY;
/* 1. initialize oi before any file create or file open */
- result = osd_oi_init(oti, &osd->od_oi,
+ result = osd_oi_init(oti, &osd->od_oi_table,
&osd->od_dt_dev, lu2md_dev(pdev));
- if (result != 0)
+ if (result < 0)
RETURN(result);
+ LASSERT(result > 0);
+ osd->od_oi_count = result;
+
lmi = osd->od_mount;
lsi = s2lsi(lmi->lmi_sb);
ldd = lsi->lsi_ldd;