From a66e48fc9b3bf62ef7b1913def91fc68e99c773c Mon Sep 17 00:00:00 2001 From: Niu Yawei Date: Mon, 10 Sep 2012 02:30:34 -0400 Subject: [PATCH] LU-1842 quota: ldiskfs local enforcement Quota local enforcement for ldiskfs osd Signed-off-by: Niu Yawei Change-Id: Ica4c1ae74c4af5a913691e60c080cb431e273c3c Reviewed-on: http://review.whamcloud.com/3915 Tested-by: Hudson Reviewed-by: Johann Lombardi Tested-by: Maloo Reviewed-by: Fan Yong --- lustre/include/lquota.h | 61 +++++++++ lustre/include/lustre/lustre_idl.h | 2 + lustre/ofd/ofd_io.c | 28 +++- lustre/osd-ldiskfs/osd_handler.c | 266 ++++++++++++++++++++++++------------- lustre/osd-ldiskfs/osd_internal.h | 12 +- lustre/osd-ldiskfs/osd_io.c | 118 ++++++++++++---- lustre/osd-ldiskfs/osd_quota.c | 127 ++++++++++++++++++ 7 files changed, 486 insertions(+), 128 deletions(-) diff --git a/lustre/include/lquota.h b/lustre/include/lquota.h index 9f2e3e5..947e2bc 100644 --- a/lustre/include/lquota.h +++ b/lustre/include/lquota.h @@ -48,6 +48,50 @@ union lquota_rec { #define QUOTA_DATAPOOL_NAME "ost=" /* + * Quota information attached to a transaction + */ + +struct lquota_entry; + +struct lquota_id_info { + /* quota identifier */ + union lquota_id lqi_id; + + /* USRQUOTA or GRPQUOTA for now, could be expanded for + * directory quota or other types later. */ + int lqi_type; + + /* inodes or kbytes to be consumed or released, it could + * be negative when releasing space. */ + long long lqi_space; + + /* quota slave entry structure associated with this ID */ + struct lquota_entry *lqi_qentry; + + /* whether we are reporting blocks or inodes */ + bool lqi_is_blk; +}; + +/* Since we enforce only inode quota in meta pool (MDTs), and block quota in + * data pool (OSTs), there are at most 4 quota ids being enforced in a single + * transaction, which is chown transaction: + * original uid and gid, new uid and gid. + * + * This value might need to be revised when directory quota is added. */ +#define QUOTA_MAX_TRANSIDS 4 + +/* all qids involved in a single transaction */ +struct lquota_trans { + unsigned short lqt_id_cnt; + struct lquota_id_info lqt_ids[QUOTA_MAX_TRANSIDS]; +}; + +/* flags for quota local enforcement */ +#define QUOTA_FL_OVER_USRQUOTA 0x01 +#define QUOTA_FL_OVER_GRPQUOTA 0x02 +#define QUOTA_FL_SYNC 0x04 + +/* * Quota enforcement support on slaves */ @@ -79,6 +123,23 @@ void qsd_fini(const struct lu_env *, struct qsd_instance *); int lquotactl_slv(const struct lu_env *, struct dt_device *, struct obd_quotactl *); +/* XXX: dummy qsd_op_begin() & qsd_op_end(), will be replaced with the real + * one once all the enforcement code landed. */ +static inline int qsd_op_begin(const struct lu_env *env, + struct qsd_instance *qsd, + struct lquota_trans *trans, + struct lquota_id_info *qi, + int *flags) +{ + return 0; +} + +static inline void qsd_op_end(const struct lu_env *env, + struct qsd_instance *qsd, + struct lquota_trans *trans) +{ +} + #ifdef LPROCFS /* dumb procfs handler which always report success, for backward compatibility * purpose */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 0622b44..cd4ed5f 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1527,6 +1527,8 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_BRW_SRVLOCK 0x200 /* Client holds no lock over this page */ #define OBD_BRW_ASYNC 0x400 /* Server may delay commit to disk */ #define OBD_BRW_MEMALLOC 0x800 /* Client runs in the "kswapd" context */ +#define OBD_BRW_OVER_USRQUOTA 0x1000 /* Running out of user quota */ +#define OBD_BRW_OVER_GRPQUOTA 0x2000 /* Running out of group quota */ #define OBD_OBJECT_EOF 0xffffffffffffffffULL diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index fa74959..320deab 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -167,6 +167,9 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, lnb[j+k].lnb_rc = -ENOSPC; if (!(rnb[i].rnb_flags & OBD_BRW_ASYNC)) oti->oti_sync_write = 1; + /* remote client can't break through quota */ + if (exp_connect_rmtclient(exp)) + lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA; } j += rc; LASSERT(j <= PTLRPC_MAX_BRW_PAGES); @@ -508,12 +511,27 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, else obdo_from_la(oa, &info->fti_attr, LA_GID | LA_UID); - if (ofd_grant_prohibit(exp, ofd)) - /* Trick to prevent clients from waiting for bulk write - * in flight since they won't get any grant in the reply - * anyway so they had better firing the sync write RPC - * straight away */ + /* don't report overquota flag if we failed before reaching + * commit */ + if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) { + /* return the overquota flags to client */ + if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) { + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NO_USRQUOTA; + else + oa->o_flags = OBD_FL_NO_USRQUOTA; + } + + if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) { + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NO_GRPQUOTA; + else + oa->o_flags = OBD_FL_NO_GRPQUOTA; + } + + oa->o_valid |= OBD_MD_FLFLAGS; oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA; + } } else if (cmd == OBD_BRW_READ) { struct ldlm_namespace *ns = ofd->ofd_namespace; diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 76a2d19..aee20d7 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -176,51 +176,6 @@ static int osd_root_get(const struct lu_env *env, return 0; } -static inline int osd_qid_type(struct osd_thandle *oh, int i) -{ - return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA; -} - -static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type) -{ - oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0); -} - -void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh, - int type, uid_t id, struct inode *inode) -{ -#ifdef CONFIG_QUOTA - int i, allocated = 0; - struct osd_object *obj; - - LASSERT(dt != NULL); - LASSERT(oh != NULL); - LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u", - oh->ot_id_cnt); - - /* id entry is allocated in the quota file */ - if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off) - allocated = 1; - - for (i = 0; i < oh->ot_id_cnt; i++) { - if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type) - return; - } - - if (unlikely(i >= OSD_MAX_UGID_CNT)) { - CERROR("more than %d uid/gids for a transaction?\n", i); - return; - } - - oh->ot_id_array[i] = id; - osd_qid_set_type(oh, i, type); - oh->ot_id_cnt++; - obj = osd_dt_obj(dt); - oh->ot_credits += (allocated || id == 0) ? - 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj))); -#endif -} - /* * OSD object methods. */ @@ -745,6 +700,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th = ERR_PTR(-ENOMEM); OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO); if (oh != NULL) { + oh->ot_quota_trans = &oti->oti_quota_trans; + memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans)); th = &oh->ot_super; th->th_dev = d; th->th_result = 0; @@ -855,11 +812,17 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th) struct osd_thandle *oh; struct osd_thread_info *oti = osd_oti_get(env); struct osd_iobuf *iobuf = &oti->oti_iobuf; - + struct qsd_instance *qsd = oti->oti_dev->od_quota_slave; ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); + if (qsd != NULL) + /* inform the quota slave device that the transaction is + * stopping */ + qsd_op_end(env, qsd, oh->ot_quota_trans); + oh->ot_quota_trans = NULL; + if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; @@ -1453,35 +1416,139 @@ static int osd_declare_attr_set(const struct lu_env *env, const struct lu_attr *attr, struct thandle *handle) { - struct osd_thandle *oh; - struct osd_object *obj; + struct osd_thandle *oh; + struct osd_object *obj; + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + long long bspace; + int rc = 0; + bool allocated; + ENTRY; - LASSERT(dt != NULL); - LASSERT(handle != NULL); + LASSERT(dt != NULL); + LASSERT(handle != NULL); - obj = osd_dt_obj(dt); - LASSERT(osd_invariant(obj)); + obj = osd_dt_obj(dt); + LASSERT(osd_invariant(obj)); - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle == NULL); + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle == NULL); - OSD_DECLARE_OP(oh, attr_set); - oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; + OSD_DECLARE_OP(oh, attr_set); + oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; - if (attr && attr->la_valid & LA_UID) { - if (obj->oo_inode) - osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid, - obj->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); - } - if (attr && attr->la_valid & LA_GID) { - if (obj->oo_inode) - osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid, - obj->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); - } + if (attr == NULL || obj->oo_inode == NULL) + RETURN(rc); - return 0; + bspace = obj->oo_inode->i_blocks; + bspace <<= obj->oo_inode->i_sb->s_blocksize_bits; + bspace = toqb(bspace); + + /* Changing ownership is always preformed by super user, it should not + * fail with EDQUOT. + * + * We still need to call the osd_declare_qid() to calculate the journal + * credits for updating quota accounting files and to trigger quota + * space adjustment once the operation is completed.*/ + if ((attr->la_valid & LA_UID) != 0 && + attr->la_uid != obj->oo_inode->i_uid) { + qi->lqi_type = USRQUOTA; + + /* inode accounting */ + qi->lqi_is_blk = false; + + /* one more inode for the new owner ... */ + qi->lqi_id.qid_uid = attr->la_uid; + qi->lqi_space = 1; + allocated = (attr->la_uid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and one less inode for the current uid */ + qi->lqi_id.qid_uid = obj->oo_inode->i_uid; + qi->lqi_space = -1; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* block accounting */ + qi->lqi_is_blk = true; + + /* more blocks for the new owner ... */ + qi->lqi_id.qid_uid = attr->la_uid; + qi->lqi_space = bspace; + allocated = (attr->la_uid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and finally less blocks for the current owner */ + qi->lqi_id.qid_uid = obj->oo_inode->i_uid; + qi->lqi_space = -bspace; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + } + + if (attr->la_valid & LA_GID && + attr->la_gid != obj->oo_inode->i_gid) { + qi->lqi_type = GRPQUOTA; + + /* inode accounting */ + qi->lqi_is_blk = false; + + /* one more inode for the new group owner ... */ + qi->lqi_id.qid_gid = attr->la_gid; + qi->lqi_space = 1; + allocated = (attr->la_gid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and one less inode for the current gid */ + qi->lqi_id.qid_gid = obj->oo_inode->i_gid; + qi->lqi_space = -1; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* block accounting */ + qi->lqi_is_blk = true; + + /* more blocks for the new owner ... */ + qi->lqi_id.qid_gid = attr->la_gid; + qi->lqi_space = bspace; + allocated = (attr->la_gid == 0) ? true : false; + rc = osd_declare_qid(env, oh, qi, allocated, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + + /* and finally less blocks for the current owner */ + qi->lqi_id.qid_gid = obj->oo_inode->i_gid; + qi->lqi_space = -bspace; + rc = osd_declare_qid(env, oh, qi, true, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + RETURN(rc); + } + + RETURN(rc); } static int osd_inode_setattr(const struct lu_env *env, @@ -1952,7 +2019,9 @@ static int osd_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *handle) { - struct osd_thandle *oh; + struct osd_thandle *oh; + int rc; + ENTRY; LASSERT(handle != NULL); @@ -1981,11 +2050,12 @@ static int osd_declare_object_create(const struct lu_env *env, oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE]; } - if (attr) { - osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL); - osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL); - } - return 0; + if (!attr) + RETURN(0); + + rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh, + false, false, NULL, false); + RETURN(rc); } static int osd_object_create(const struct lu_env *env, struct dt_object *dt, @@ -2034,7 +2104,7 @@ static int osd_declare_object_destroy(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thandle *oh; - + int rc; ENTRY; oh = container_of0(th, struct osd_thandle, ot_super); @@ -2053,10 +2123,15 @@ static int osd_declare_object_destroy(const struct lu_env *env, oh->ot_credits += 3; } - osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode); - osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode); - - RETURN(0); + /* one less inode */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh, + false, true, NULL, false); + if (rc) + RETURN(rc); + /* data to be truncated */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, true, + true, NULL, false); + RETURN(rc); } static int osd_object_destroy(const struct lu_env *env, @@ -2932,6 +3007,9 @@ static int osd_index_declare_ea_delete(const struct lu_env *env, struct thandle *handle) { struct osd_thandle *oh; + struct inode *inode; + int rc; + ENTRY; LASSERT(dt_object_exists(dt)); LASSERT(handle != NULL); @@ -2942,13 +3020,12 @@ static int osd_index_declare_ea_delete(const struct lu_env *env, OSD_DECLARE_OP(oh, delete); oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE]; - LASSERT(osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); + inode = osd_dt_obj(dt)->oo_inode; + LASSERT(inode); - return 0; + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de, @@ -3524,6 +3601,9 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, struct thandle *handle) { struct osd_thandle *oh; + struct inode *inode; + int rc; + ENTRY; LASSERT(dt_object_exists(dt)); LASSERT(handle != NULL); @@ -3534,13 +3614,15 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, OSD_DECLARE_OP(oh, insert); oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT]; - LASSERT(osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); + inode = osd_dt_obj(dt)->oo_inode; + LASSERT(inode); - return 0; + /* We ignore block quota on meta pool (MDTs), so needn't + * calculate how many blocks will be consumed by this index + * insert */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } /** diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index add8dd4..898ee70 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -355,6 +355,7 @@ struct osd_thandle { unsigned short ot_id_cnt; unsigned short ot_id_type; uid_t ot_id_array[OSD_MAX_UGID_CNT]; + struct lquota_trans *ot_quota_trans; #ifdef OSD_TRACK_DECLARES unsigned char ot_declare_attr_set; @@ -613,6 +614,8 @@ struct osd_thread_info { struct if_dqblk oti_dqblk; struct if_dqinfo oti_dqinfo; }; + struct lquota_id_info oti_qi; + struct lquota_trans oti_quota_trans; }; extern int ldiskfs_pdo; @@ -632,8 +635,6 @@ int osd_statfs(const struct lu_env *env, struct dt_device *dev, struct obd_statfs *sfs); int osd_object_auth(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *capa, __u64 opc); -void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh, - int type, uid_t id, struct inode *inode); struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, struct osd_inode_id *id); struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev, @@ -681,6 +682,13 @@ loff_t find_tree_dqentry(const struct lu_env *env, struct osd_object *obj, int type, qid_t dqid, uint blk, int depth, struct osd_it_quota *it); +/* osd_quota.c */ +int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh, + struct lquota_id_info *qi, bool allocated, int *flags); +int osd_declare_inode_qid(const struct lu_env *env, qid_t uid, qid_t gid, + long long space, struct osd_thandle *oh, + bool is_blk, bool allocated, int *flags, bool force); + /* * Invariants, assertions. */ diff --git a/lustre/osd-ldiskfs/osd_io.c b/lustre/osd-ldiskfs/osd_io.c index af3d6af..11181b6 100644 --- a/lustre/osd-ldiskfs/osd_io.c +++ b/lustre/osd-ldiskfs/osd_io.c @@ -594,6 +594,31 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +/* Check if a block is allocated or not */ +static int osd_is_mapped(struct inode *inode, obd_size offset) +{ + sector_t (*fs_bmap)(struct address_space *, sector_t); + + fs_bmap = inode->i_mapping->a_ops->bmap; + + /* We can't know if we are overwriting or not */ + if (unlikely(fs_bmap == NULL)) + return 0; + + if (i_size_read(inode) == 0) + return 0; + + /* Beyond EOF, must not be mapped */ + if (((i_size_read(inode) - 1) >> inode->i_blkbits) < + (offset >> inode->i_blkbits)) + return 0; + + if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0) + return 0; + + return 1; +} + static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, @@ -606,20 +631,36 @@ static int osd_declare_write_commit(const struct lu_env *env, int depth; int i; int newblocks; - int old; + int rc = 0; + int flags = 0; + bool ignore_quota = false; + long long quota_space = 0; + ENTRY; LASSERT(handle != NULL); oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - old = oh->ot_credits; newblocks = npages; /* calculate number of extents (probably better to pass nb) */ - for (i = 1; i < npages; i++) - if (lnb[i].offset != - lnb[i - 1].offset + lnb[i - 1].len) - extents++; + for (i = 0; i < npages; i++) { + if (i && lnb[i].offset != + lnb[i - 1].offset + lnb[i - 1].len) + extents++; + + if (!osd_is_mapped(inode, lnb[i].offset)) + quota_space += CFS_PAGE_SIZE; + + /* ignore quota for the whole request if any page is from + * client cache or written by root. + * + * XXX we could handle this on per-lnb basis as done by + * grant. */ + if ((lnb[i].flags & OBD_BRW_NOQUOTA) || + !(lnb[i].flags & OBD_BRW_SYNC)) + ignore_quota = true; + } /* * each extent can go into new leaf causing a split @@ -643,6 +684,12 @@ static int osd_declare_write_commit(const struct lu_env *env, oh->ot_credits += depth * extents; } + /* quota space for metadata blocks */ + quota_space += depth * extents * LDISKFS_BLOCK_SIZE(osd_sb(osd)); + + /* quota space should be reported in 1K blocks */ + quota_space = toqb(quota_space); + /* each new block can go in different group (bitmap + gd) */ /* we can't dirty more bitmap blocks than exist */ @@ -657,26 +704,25 @@ static int osd_declare_write_commit(const struct lu_env *env, else oh->ot_credits += newblocks; - RETURN(0); -} + /* make sure the over quota flags were not set */ + lnb[0].flags &= ~(OBD_BRW_OVER_USRQUOTA | OBD_BRW_OVER_GRPQUOTA); -/* Check if a block is allocated or not */ -static int osd_is_mapped(struct inode *inode, obd_size offset) -{ - sector_t (*fs_bmap)(struct address_space *, sector_t); - - fs_bmap = inode->i_mapping->a_ops->bmap; - - /* We can't know if we are overwriting or not */ - if (fs_bmap == NULL) - return 0; + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, + quota_space, oh, true, true, &flags, + ignore_quota); - if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0) - return 0; + /* we need only to store the overquota flags in the first lnb for + * now, once we support multiple objects BRW, this code needs be + * revised. */ + if (flags & QUOTA_FL_OVER_USRQUOTA) + lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + if (flags & QUOTA_FL_OVER_GRPQUOTA) + lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; - return 1; + RETURN(rc); } +/* Check if a block is allocated or not */ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, struct thandle *thandle) @@ -931,6 +977,9 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, { struct osd_thandle *oh; int credits; + struct inode *inode; + int rc; + ENTRY; LASSERT(handle != NULL); @@ -952,14 +1001,18 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, OSD_DECLARE_OP(oh, write); oh->ot_credits += credits; - if (osd_dt_obj(dt)->oo_inode == NULL) - return 0; + inode = osd_dt_obj(dt)->oo_inode; - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); - return 0; + /* we may declare write to non-exist llog */ + if (inode == NULL) + RETURN(0); + + /* dt_declare_write() is usually called for system objects, such + * as llog or last_rcvd files. We needn't enforce quota on those + * objects, so always set the lqi_space as 0. */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen) @@ -1109,6 +1162,8 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *th) { struct osd_thandle *oh; + struct inode *inode; + int rc; ENTRY; LASSERT(th); @@ -1127,7 +1182,12 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; oh->ot_credits += 3; - RETURN(0); + inode = osd_dt_obj(dt)->oo_inode; + LASSERT(inode); + + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static int osd_punch(const struct lu_env *env, struct dt_object *dt, diff --git a/lustre/osd-ldiskfs/osd_quota.c b/lustre/osd-ldiskfs/osd_quota.c index 1d26b48..5abfafa 100644 --- a/lustre/osd-ldiskfs/osd_quota.c +++ b/lustre/osd-ldiskfs/osd_quota.c @@ -415,3 +415,130 @@ const struct dt_index_operations osd_acct_index_ops = { } }; +static inline int osd_qid_type(struct osd_thandle *oh, int i) +{ + return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA; +} + +static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type) +{ + oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0); +} + +/** + * Reserve journal credits for quota files update first, then call + * ->op_begin() to perform quota enforcement. + * + * \param env - the environment passed by the caller + * \param oh - osd transaction handle + * \param qi - quota id & space required for this operation + * \param allocated - dquot entry in quota accounting file has been allocated + * \param flags - if the operation is write, return no user quota, no + * group quota, or sync commit flags to the caller + * + * \retval 0 - success + * \retval -ve - failure + */ +int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh, + struct lquota_id_info *qi, bool allocated, int *flags) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct osd_device *dev = info->oti_dev; + struct qsd_instance *qsd = dev->od_quota_slave; + int i, rc; + bool found = false; + ENTRY; + + LASSERT(oh != NULL); + LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%d\n", + oh->ot_id_cnt); + + for (i = 0; i < oh->ot_id_cnt; i++) { + if (oh->ot_id_array[i] == qi->lqi_id.qid_uid && + osd_qid_type(oh, i) == qi->lqi_type) { + found = true; + break; + } + } + + if (!found) { + /* we need to account for credits for this new ID */ + if (i >= OSD_MAX_UGID_CNT) { + CERROR("Too many(%d) trans qids!\n", i + 1); + RETURN(-EOVERFLOW); + } + + oh->ot_credits += (allocated || qi->lqi_id.qid_uid == 0) ? + 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev)); + + oh->ot_id_array[i] = qi->lqi_id.qid_uid; + osd_qid_set_type(oh, i, qi->lqi_type); + oh->ot_id_cnt++; + } + + if (unlikely(qsd == NULL)) + /* quota slave instance hasn't been allocated yet */ + RETURN(0); + + /* check quota */ + rc = qsd_op_begin(env, qsd, oh->ot_quota_trans, qi, flags); + RETURN(rc); +} + +/** + * Wrapper for osd_declare_qid() + * + * \param env - the environment passed by the caller + * \param uid - user id of the inode + * \param gid - group id of the inode + * \param space - how many blocks/inodes will be consumed/released + * \param oh - osd transaction handle + * \param is_blk - block quota or inode quota? + * \param allocated - dquot entry in quota accounting file has been allocated + * \param flags - if the operation is write, return no user quota, no + * group quota, or sync commit flags to the caller + * \param force - set to 1 when changes are performed by root user and thus + * can't failed with EDQUOT + * + * \retval 0 - success + * \retval -ve - failure + */ +int osd_declare_inode_qid(const struct lu_env *env, qid_t uid, qid_t gid, + long long space, struct osd_thandle *oh, + bool is_blk, bool allocated, int *flags, bool force) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + int rcu, rcg; /* user & group rc */ + ENTRY; + + /* let's start with user quota */ + qi->lqi_id.qid_uid = uid; + qi->lqi_type = USRQUOTA; + qi->lqi_space = space; + qi->lqi_is_blk = is_blk; + rcu = osd_declare_qid(env, oh, qi, allocated, flags); + + if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS)) + /* ignore EDQUOT & EINPROGRESS when changes are done by root */ + rcu = 0; + + /* For non-fatal error, we want to continue to get the noquota flags + * for group id. This is only for commit write, which has @flags passed + * in. See osd_declare_write_commit(). + * When force is set to true, we also want to proceed with the gid */ + if (rcu && (rcu != -EDQUOT || flags == NULL)) + RETURN(rcu); + + /* and now group quota */ + qi->lqi_id.qid_gid = gid; + qi->lqi_type = GRPQUOTA; + rcg = osd_declare_qid(env, oh, qi, allocated, flags); + + if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS)) + /* as before, ignore EDQUOT & EINPROGRESS for root */ + rcg = 0; + + RETURN(rcu ? rcu : rcg); +} + -- 1.8.3.1