Originally, the limitation of ACL entries is 32, that is not
enough for some use cases. In fact, restricting ACL entries
count is mainly for preparing the RPC reply buffer to receive
the ACL data. So we cannot make the ACL entries count to be
unlimited. But we can enlarge the RPC reply buffer to hold
more ACL entries. On the other hand, MDT backend filesystem
has its own EA size limitation. For example, for ldiskfs case,
if large EA enable, then the max ACL size is 1048492 bytes;
otherwise, it is 4012 bytes. For ZFS backend, such value is
32768 bytes. With such hard limitation, we can calculate how
many ACL entries we can have at most. This patch increases
the RPC reply buffer to match such hard limitation. For old
client, to avoid buffer overflow because of large ACL data
(more than 32 ACL entries), the MDT will forbid the old client
to access the file with large ACL data. As for how to know
whether it is old client or new, a new connection flag
OBD_CONNECT_LARGE_ACL is used for that.
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I557d2696a8df84bf83fcd3955292227d7aa0284c
Reviewed-on: https://review.whamcloud.com/19790
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Li Xi <lixi@ddn.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
23 files changed:
#define OBD_CONNECT_REQPORTAL 0x40ULL /*Separate non-IO req portal */
#define OBD_CONNECT_ACL 0x80ULL /*access control lists */
#define OBD_CONNECT_XATTR 0x100ULL /*client use extended attr */
#define OBD_CONNECT_REQPORTAL 0x40ULL /*Separate non-IO req portal */
#define OBD_CONNECT_ACL 0x80ULL /*access control lists */
#define OBD_CONNECT_XATTR 0x100ULL /*client use extended attr */
-#define OBD_CONNECT_CROW 0x200ULL /*MDS+OST create obj on write*/
+#define OBD_CONNECT_LARGE_ACL 0x200ULL /* more than 32 ACL entries */
#define OBD_CONNECT_TRUNCLOCK 0x400ULL /*locks on server for punch */
#define OBD_CONNECT_TRANSNO 0x800ULL /*replay sends init transno */
#define OBD_CONNECT_IBITS 0x1000ULL /*support for inodebits locks*/
#define OBD_CONNECT_TRUNCLOCK 0x400ULL /*locks on server for punch */
#define OBD_CONNECT_TRANSNO 0x800ULL /*replay sends init transno */
#define OBD_CONNECT_IBITS 0x1000ULL /*support for inodebits locks*/
OBD_CONNECT_DIR_STRIPE | \
OBD_CONNECT_BULK_MBITS | \
OBD_CONNECT_MULTIMODRPCS | \
OBD_CONNECT_DIR_STRIPE | \
OBD_CONNECT_BULK_MBITS | \
OBD_CONNECT_MULTIMODRPCS | \
- OBD_CONNECT_SUBTREE | \
+ OBD_CONNECT_SUBTREE | OBD_CONNECT_LARGE_ACL | \
OBD_CONNECT_FLAGS2)
#define MDT_CONNECT_SUPPORTED2 OBD_CONNECT2_FILE_SECCTX
OBD_CONNECT_FLAGS2)
#define MDT_CONNECT_SUPPORTED2 OBD_CONNECT2_FILE_SECCTX
#ifdef CONFIG_FS_POSIX_ACL
# include <linux/posix_acl_xattr.h>
# define LUSTRE_POSIX_ACL_MAX_ENTRIES 32
#ifdef CONFIG_FS_POSIX_ACL
# include <linux/posix_acl_xattr.h>
# define LUSTRE_POSIX_ACL_MAX_ENTRIES 32
-# define LUSTRE_POSIX_ACL_MAX_SIZE \
+# define LUSTRE_POSIX_ACL_MAX_SIZE_OLD \
(sizeof(posix_acl_xattr_header) + \
LUSTRE_POSIX_ACL_MAX_ENTRIES * sizeof(posix_acl_xattr_entry))
#endif /* CONFIG_FS_POSIX_ACL */
(sizeof(posix_acl_xattr_header) + \
LUSTRE_POSIX_ACL_MAX_ENTRIES * sizeof(posix_acl_xattr_entry))
#endif /* CONFIG_FS_POSIX_ACL */
-#ifndef LUSTRE_POSIX_ACL_MAX_SIZE
-# define LUSTRE_POSIX_ACL_MAX_SIZE 0
+#ifndef LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+# define LUSTRE_POSIX_ACL_MAX_SIZE_OLD 0
#endif /* LUSTRE_POSIX_ACL_MAX_SIZE */
#endif
#endif /* LUSTRE_POSIX_ACL_MAX_SIZE */
#endif
return ocd->ocd_ibits_known;
}
return ocd->ocd_ibits_known;
}
+static inline int exp_connect_large_acl(struct obd_export *exp)
+{
+ return !!(exp_connect_flags(exp) & OBD_CONNECT_LARGE_ACL);
+}
+
extern struct obd_export *class_conn2export(struct lustre_handle *conn);
extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
extern struct obd_export *class_conn2export(struct lustre_handle *conn);
extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
#endif
#ifdef CONFIG_FS_POSIX_ACL
data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
#endif
#ifdef CONFIG_FS_POSIX_ACL
- data->ocd_connect_flags |= OBD_CONNECT_ACL | OBD_CONNECT_UMASK;
+ data->ocd_connect_flags |= OBD_CONNECT_ACL | OBD_CONNECT_UMASK |
+ OBD_CONNECT_LARGE_ACL;
#endif
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT))
#endif
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT))
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
obddev->u.cli.cl_max_mds_easize);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
obddev->u.cli.cl_max_mds_easize);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+ req->rq_import->imp_connect_data.ocd_max_easize);
ptlrpc_request_set_replen(req);
return req;
}
ptlrpc_request_set_replen(req);
return req;
}
req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
RCL_SERVER, maxdata);
req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
RCL_SERVER, maxdata);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, maxdata);
+
ptlrpc_request_set_replen(req);
RETURN(req);
ptlrpc_request_set_replen(req);
RETURN(req);
mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+ req->rq_import->imp_connect_data.ocd_max_easize);
ptlrpc_request_set_replen(req);
RETURN(req);
}
ptlrpc_request_set_replen(req);
RETURN(req);
}
LTIME_S(op_data->op_attr.ia_ctime));
mdc_setattr_pack(req, op_data, ea, ealen);
LTIME_S(op_data->op_attr.ia_ctime));
mdc_setattr_pack(req, op_data, ea, ealen);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+ req->rq_import->imp_connect_data.ocd_max_easize);
ptlrpc_request_set_replen(req);
rc = mdc_reint(req, LUSTRE_IMP_FULL);
ptlrpc_request_set_replen(req);
rc = mdc_reint(req, LUSTRE_IMP_FULL);
mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
op_data->op_mode, -1, 0);
mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
op_data->op_mode, -1, 0);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+ req->rq_import->imp_connect_data.ocd_max_easize);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
op_data->op_mode);
ptlrpc_request_set_replen(req);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
op_data->op_mode);
ptlrpc_request_set_replen(req);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
op_data->op_mode);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
op_data->op_mode);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+ req->rq_import->imp_connect_data.ocd_max_easize);
ptlrpc_request_set_replen(req);
rc = mdc_getattr_common(exp, req);
ptlrpc_request_set_replen(req);
rc = mdc_getattr_common(exp, req);
LU_KEY_INIT(mdd, struct mdd_thread_info);
static void mdd_key_fini(const struct lu_context *ctx,
LU_KEY_INIT(mdd, struct mdd_thread_info);
static void mdd_key_fini(const struct lu_context *ctx,
- struct lu_context_key *key, void *data)
+ struct lu_context_key *key, void *data)
- struct mdd_thread_info *info = data;
+ struct mdd_thread_info *info = data;
lu_buf_free(&info->mti_big_buf);
lu_buf_free(&info->mti_link_buf);
lu_buf_free(&info->mti_big_buf);
lu_buf_free(&info->mti_link_buf);
+ lu_buf_free(&info->mti_xattr_buf);
}
/* context key: mdd_thread_key */
}
/* context key: mdd_thread_key */
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
- acl_buf.lb_buf = info->mti_xattr_buf;
- acl_buf.lb_len = sizeof(info->mti_xattr_buf);
+ lu_buf_check_and_alloc(&info->mti_xattr_buf,
+ mdd->mdd_dt_conf.ddp_max_ea_size);
+ acl_buf = info->mti_xattr_buf;
def_acl_buf.lb_buf = info->mti_key;
def_acl_buf.lb_len = sizeof(info->mti_key);
rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
def_acl_buf.lb_buf = info->mti_key;
def_acl_buf.lb_len = sizeof(info->mti_key);
rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
struct md_op_spec *spec = &info->mti_spec;
struct lu_buf lmm_buf = { NULL };
struct lu_buf link_buf = { NULL };
struct md_op_spec *spec = &info->mti_spec;
struct lu_buf lmm_buf = { NULL };
struct lu_buf link_buf = { NULL };
- const struct lu_buf *buf;
struct thandle *handle;
struct lmv_mds_md_v1 *mgr_ea;
struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
struct thandle *handle;
struct lmv_mds_md_v1 *mgr_ea;
struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
spec->sp_cr_lookup = 0;
spec->sp_feat = &dt_directory_features;
if (S_ISLNK(la->la_mode)) {
spec->sp_cr_lookup = 0;
spec->sp_feat = &dt_directory_features;
if (S_ISLNK(la->la_mode)) {
+ const struct lu_buf *buf;
+
buf = lu_buf_check_and_alloc(
&mdd_env_info(env)->mti_big_buf,
la->la_size + 1);
buf = lu_buf_check_and_alloc(
&mdd_env_info(env)->mti_big_buf,
la->la_size + 1);
- mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
+ mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
+ lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
+ mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
+ mgr_buf.lb_len = mgr_easize;
+ mgr_ea = mgr_buf.lb_buf;
memset(mgr_ea, 0, sizeof(*mgr_ea));
mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
mgr_ea->lmv_stripe_count = cpu_to_le32(2);
memset(mgr_ea, 0, sizeof(*mgr_ea));
mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
mgr_ea->lmv_stripe_count = cpu_to_le32(2);
* the last step of migration, so we set th_local = 1 to avoid
* update last rcvd for this transaction */
handle->th_local = 1;
* the last step of migration, so we set th_local = 1 to avoid
* update last rcvd for this transaction */
handle->th_local = 1;
- rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
- spec, la,
- (union lmv_mds_md *)info->mti_xattr_buf,
- ldata, handle);
+ rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
+ la, mgr_buf.lb_buf, ldata, handle);
if (rc != 0)
GOTO(stop_trans, rc);
if (rc != 0)
GOTO(stop_trans, rc);
/* Set MIGRATE EA on the source inode, so once the migration needs
* to be re-done during failover, the re-do process can locate the
* target object which is already being created. */
/* Set MIGRATE EA on the source inode, so once the migration needs
* to be re-done during failover, the re-do process can locate the
* target object which is already being created. */
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
- rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
+ rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
if (rc != 0)
GOTO(stop_trans, rc);
if (rc != 0)
GOTO(stop_trans, rc);
struct lu_buf mti_buf[4];
struct lu_buf mti_big_buf; /* biggish persistent buf */
struct lu_buf mti_link_buf; /* buf for link ea */
struct lu_buf mti_buf[4];
struct lu_buf mti_big_buf; /* biggish persistent buf */
struct lu_buf mti_link_buf; /* buf for link ea */
+ struct lu_buf mti_xattr_buf;
- char mti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
struct dt_allocation_hint mti_hint;
struct dt_object_format mti_dof;
struct linkea_data mti_link_data;
struct dt_allocation_hint mti_hint;
struct dt_object_format mti_dof;
struct linkea_data mti_link_data;
RETURN(-ENOMEM);
/* Read HSM attrs from disk */
RETURN(-ENOMEM);
/* Read HSM attrs from disk */
- CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
- current_buf = mdd_buf_get(env, info->mti_xattr_buf,
- sizeof(info->mti_xattr_buf));
+ current_buf = lu_buf_check_and_alloc(&info->mti_xattr_buf,
+ mdo2mdd(obj)->mdd_dt_conf.ddp_max_ea_size);
rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
if (rc < 0 && rc != -ENODATA)
rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
if (rc < 0 && rc != -ENODATA)
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
struct thandle *handle)
{
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
struct thandle *handle)
{
posix_acl_xattr_header *head;
posix_acl_xattr_entry *entry;
int entry_count;
posix_acl_xattr_header *head;
posix_acl_xattr_entry *entry;
int entry_count;
- buf = mdd_buf_get(env, mdd_env_info(env)->mti_xattr_buf,
- sizeof(mdd_env_info(env)->mti_xattr_buf));
-
- rc = mdo_xattr_get(env, o, buf, XATTR_NAME_ACL_ACCESS);
+ lu_buf_check_and_alloc(&mdd_env_info(env)->mti_xattr_buf,
+ mdd_obj2mdd_dev(o)->mdd_dt_conf.ddp_max_ea_size);
+ buf = mdd_env_info(env)->mti_xattr_buf;
+ rc = mdo_xattr_get(env, o, &buf, XATTR_NAME_ACL_ACCESS);
if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
RETURN(0);
else if (rc <= 0)
RETURN(rc);
if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
RETURN(0);
else if (rc <= 0)
RETURN(rc);
- buf->lb_len = rc;
- head = (posix_acl_xattr_header *)(buf->lb_buf);
+ buf.lb_len = rc;
+ head = (posix_acl_xattr_header *)(buf.lb_buf);
- entry_count = (buf->lb_len - sizeof(head->a_version)) /
+ entry_count = (buf.lb_len - sizeof(head->a_version)) /
sizeof(posix_acl_xattr_entry);
if (entry_count <= 0)
RETURN(0);
sizeof(posix_acl_xattr_entry);
if (entry_count <= 0)
RETURN(0);
- rc = mdo_xattr_set(env, o, buf, XATTR_NAME_ACL_ACCESS,
+ rc = mdo_xattr_set(env, o, &buf, XATTR_NAME_ACL_ACCESS,
0, handle);
RETURN(rc);
}
0, handle);
RETURN(rc);
}
struct lu_ucred *uc = lu_ucred_assert(env);
posix_acl_xattr_header *head;
posix_acl_xattr_entry *entry;
struct lu_ucred *uc = lu_ucred_assert(env);
posix_acl_xattr_header *head;
posix_acl_xattr_entry *entry;
int entry_count;
int rc;
ENTRY;
int entry_count;
int rc;
ENTRY;
- buf = mdd_buf_get(env, mdd_env_info(env)->mti_xattr_buf,
- sizeof(mdd_env_info(env)->mti_xattr_buf));
- rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS);
+ lu_buf_check_and_alloc(&mdd_env_info(env)->mti_xattr_buf,
+ mdd_obj2mdd_dev(obj)->mdd_dt_conf.ddp_max_ea_size);
+ buf = mdd_env_info(env)->mti_xattr_buf;
+ rc = mdo_xattr_get(env, obj, &buf, XATTR_NAME_ACL_ACCESS);
if (rc <= 0)
RETURN(rc ? : -EACCES);
if (rc <= 0)
RETURN(rc ? : -EACCES);
- buf->lb_len = rc;
- head = (posix_acl_xattr_header *)(buf->lb_buf);
+ buf.lb_len = rc;
+ head = (posix_acl_xattr_header *)(buf.lb_buf);
- entry_count = posix_acl_xattr_count(buf->lb_len);
+ entry_count = posix_acl_xattr_count(buf.lb_len);
/* Disregard empty ACLs and fall back to
* standard UNIX permissions. See LU-5434 */
/* Disregard empty ACLs and fall back to
* standard UNIX permissions. See LU-5434 */
const struct lu_env *env = info->mti_env;
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
const struct lu_env *env = info->mti_env;
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
+ struct mdt_device *mdt = info->mti_mdt;
buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
RCL_SERVER);
if (buf->lb_len == 0)
buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
RCL_SERVER);
if (buf->lb_len == 0)
rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
if (rc < 0) {
if (rc == -ENODATA) {
rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
if (rc < 0) {
if (rc == -ENODATA) {
} else if (rc == -EOPNOTSUPP) {
rc = 0;
} else {
} else if (rc == -EOPNOTSUPP) {
rc = 0;
} else {
+ if (rc == -ERANGE &&
+ exp_connect_large_acl(info->mti_exp) &&
+ buf->lb_buf != info->mti_big_acl) {
+ if (info->mti_big_acl == NULL) {
+ OBD_ALLOC_LARGE(info->mti_big_acl,
+ mdt->mdt_max_ea_size);
+ if (info->mti_big_acl == NULL) {
+ CERROR("%s: unable to grow "
+ DFID" ACL buffer\n",
+ mdt_obd_name(mdt),
+ PFID(mdt_object_fid(o)));
+ RETURN(-ENOMEM);
+ }
+
+ info->mti_big_aclsize =
+ mdt->mdt_max_ea_size;
+ }
+
+ CDEBUG(D_INODE, "%s: grow the "DFID
+ " ACL buffer to size %d\n",
+ mdt_obd_name(mdt),
+ PFID(mdt_object_fid(o)),
+ mdt->mdt_max_ea_size);
+
+ buf->lb_buf = info->mti_big_acl;
+ buf->lb_len = info->mti_big_aclsize;
+
+ goto again;
+ }
+
CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)), rc);
+ mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
+ if (buf->lb_buf == info->mti_big_acl)
+ info->mti_big_acl_used = 1;
+
rc = nodemap_map_acl(nodemap, buf->lb_buf,
rc, NODEMAP_FS_TO_CLIENT);
/* if all ACLs mapped out, rc is still >= 0 */
if (rc < 0) {
CERROR("%s: nodemap_map_acl unable to parse "DFID
rc = nodemap_map_acl(nodemap, buf->lb_buf,
rc, NODEMAP_FS_TO_CLIENT);
/* if all ACLs mapped out, rc is still >= 0 */
if (rc < 0) {
CERROR("%s: nodemap_map_acl unable to parse "DFID
- " ACL: rc = %d\n", mdt_obd_name(info->mti_mdt),
+ " ACL: rc = %d\n", mdt_obd_name(mdt),
PFID(mdt_object_fid(o)), rc);
} else {
repbody->mbo_aclsize = rc;
PFID(mdt_object_fid(o)), rc);
} else {
repbody->mbo_aclsize = rc;
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
if (unlikely(rc != 0))
GOTO(out, rc = err_serious(rc));
rc = req_capsule_server_pack(pill);
if (unlikely(rc != 0))
GOTO(out, rc = err_serious(rc));
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
if (rc != 0) {
CERROR("Can't pack response, rc %d\n", rc);
rc = req_capsule_server_pack(pill);
if (rc != 0) {
CERROR("Can't pack response, rc %d\n", rc);
req_capsule_set_size(pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
req_capsule_set_size(pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
}
RETURN(rc);
rc = req_capsule_server_pack(pill);
}
RETURN(rc);
info->mti_cross_ref = 0;
info->mti_opdata = 0;
info->mti_big_lmm_used = 0;
info->mti_cross_ref = 0;
info->mti_opdata = 0;
info->mti_big_lmm_used = 0;
+ info->mti_big_acl_used = 0;
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
info->mti_big_lmm = NULL;
info->mti_big_lmmsize = 0;
}
info->mti_big_lmm = NULL;
info->mti_big_lmmsize = 0;
}
+
+ if (info->mti_big_acl) {
+ OBD_FREE_LARGE(info->mti_big_acl, info->mti_big_aclsize);
+ info->mti_big_acl = NULL;
+ info->mti_big_aclsize = 0;
+ }
+
const struct ldlm_request *mti_dlm_req;
__u32 mti_has_trans:1, /* has txn already? */
const struct ldlm_request *mti_dlm_req;
__u32 mti_has_trans:1, /* has txn already? */
+ mti_cross_ref:1,
+ /* big_lmm buffer was used and must be used in reply */
+ mti_big_lmm_used:1,
+ mti_big_acl_used:1;
/* opdata for mdt_reint_open(), has the same as
* ldlm_reply:lock_policy_res1. mdt_update_last_rcvd() stores this
/* opdata for mdt_reint_open(), has the same as
* ldlm_reply:lock_policy_res1. mdt_update_last_rcvd() stores this
struct lu_name mti_name;
/* per-thread values, can be re-used, may be vmalloc'd */
void *mti_big_lmm;
struct lu_name mti_name;
/* per-thread values, can be re-used, may be vmalloc'd */
void *mti_big_lmm;
- /* big_lmm buffer was used and must be used in reply */
- int mti_big_lmm_used;
/* should be enough to fit lustre_mdt_attrs */
char mti_xattr_buf[128];
struct ldlm_enqueue_info mti_einfo;
/* should be enough to fit lustre_mdt_attrs */
char mti_xattr_buf[128];
struct ldlm_enqueue_info mti_einfo;
acl_size = body->mbo_aclsize;
acl_size = body->mbo_aclsize;
- /* this replay - not send info to client */
+ /* this replay - not send info to client */
if (info->mti_spec.no_create) {
md_size = 0;
acl_size = 0;
if (info->mti_spec.no_create) {
md_size = 0;
acl_size = 0;
req_capsule_server_get(pill, &RMF_MDT_MD));
req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER);
}
req_capsule_server_get(pill, &RMF_MDT_MD));
req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER);
}
- } else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
- req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER);
- }
+ } else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
+ req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER);
+ }
- if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
- req_capsule_shrink(pill, &RMF_ACL, acl_size, RCL_SERVER);
- else if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_shrink(pill, &RMF_LOGCOOKIES,
- acl_size, RCL_SERVER);
+ if (info->mti_big_acl_used) {
+ if (acl_size == 0)
+ info->mti_big_acl_used = 0;
+ else
+ req_capsule_shrink(pill, &RMF_ACL, 0, RCL_SERVER);
+ } else if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) {
+ req_capsule_shrink(pill, &RMF_ACL, acl_size, RCL_SERVER);
+ } else if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) {
+ req_capsule_shrink(pill, &RMF_LOGCOOKIES, acl_size, RCL_SERVER);
+ }
- if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_SERVER) &&
+ if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_SERVER) &&
!(body->mbo_valid & OBD_MD_FLMDSCAPA))
req_capsule_shrink(pill, &RMF_CAPA1, 0, RCL_SERVER);
!(body->mbo_valid & OBD_MD_FLMDSCAPA))
req_capsule_shrink(pill, &RMF_CAPA1, 0, RCL_SERVER);
memcpy(lmm, info->mti_attr.ma_lmv,
info->mti_attr.ma_lmv_size);
}
memcpy(lmm, info->mti_attr.ma_lmv,
info->mti_attr.ma_lmv_size);
}
- }
- /* update mdt_max_mdsize so clients will be aware about that */
- if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
- info->mti_mdt->mdt_max_mdsize =
- info->mti_attr.ma_lmm_size;
+ }
+
+ /* update mdt_max_mdsize so clients will be aware about that */
+ if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
+ info->mti_mdt->mdt_max_mdsize =
+ info->mti_attr.ma_lmm_size;
info->mti_big_lmm_used = 0;
info->mti_big_lmm_used = 0;
+ }
+
+ if (info->mti_big_acl_used) {
+ CDEBUG(D_INFO, "Enlarge reply ACL buffer to %d bytes\n",
+ acl_size);
+
+ rc = req_capsule_server_grow(pill, &RMF_ACL, acl_size);
+ if (rc) {
+ body->mbo_valid &= ~OBD_MD_FLACL;
+ } else {
+ void *acl = req_capsule_server_get(pill, &RMF_ACL);
+
+ memcpy(acl, info->mti_big_acl, acl_size);
+ }
+
+ info->mti_big_acl_used = 0;
+ }
+
+ RETURN(rc);
+ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER,
info->mti_body->mbo_eadatasize == 0 ? 0 : size);
rc = req_capsule_server_pack(pill);
req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER,
info->mti_body->mbo_eadatasize == 0 ? 0 : size);
rc = req_capsule_server_pack(pill);
strcmp(xattr_name, XATTR_NAME_ACL_DEFAULT) == 0)) {
struct lu_nodemap *nodemap;
strcmp(xattr_name, XATTR_NAME_ACL_DEFAULT) == 0)) {
struct lu_nodemap *nodemap;
- /* currently lustre limit acl access size */
- if (xattr_len > LUSTRE_POSIX_ACL_MAX_SIZE)
+ if ((xattr_len > info->mti_mdt->mdt_max_ea_size) ||
+ (!exp_connect_large_acl(exp) &&
+ xattr_len > LUSTRE_POSIX_ACL_MAX_SIZE_OLD))
GOTO(out, rc = -ERANGE);
nodemap = nodemap_get_from_exp(exp);
GOTO(out, rc = -ERANGE);
nodemap = nodemap_get_from_exp(exp);
int eti_big_lmmsize;
char eti_name[ETI_NAME_LEN];
struct lu_buf eti_buf;
int eti_big_lmmsize;
char eti_name[ETI_NAME_LEN];
struct lu_buf eti_buf;
- char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+ /* If we want to test large ACL, then need to enlarge the buffer. */
+ char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
struct req_msg_field RMF_EAVALS = DEFINE_MSGF("eavals", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_EAVALS);
struct req_msg_field RMF_EAVALS = DEFINE_MSGF("eavals", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_EAVALS);
-struct req_msg_field RMF_ACL =
- DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK,
- LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL);
+struct req_msg_field RMF_ACL = DEFINE_MSGF("acl", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_ACL);
/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
EXPORT_SYMBOL(RMF_ACL);
/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
OBD_CONNECT_ACL);
LASSERTF(OBD_CONNECT_XATTR == 0x100ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_XATTR);
OBD_CONNECT_ACL);
LASSERTF(OBD_CONNECT_XATTR == 0x100ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_XATTR);
- LASSERTF(OBD_CONNECT_CROW == 0x200ULL, "found 0x%.16llxULL\n",
- OBD_CONNECT_CROW);
+ LASSERTF(OBD_CONNECT_LARGE_ACL == 0x200ULL, "found 0x%.16llxULL\n",
+ OBD_CONNECT_LARGE_ACL);
LASSERTF(OBD_CONNECT_TRUNCLOCK == 0x400ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_TRUNCLOCK);
LASSERTF(OBD_CONNECT_TRANSNO == 0x800ULL, "found 0x%.16llxULL\n",
LASSERTF(OBD_CONNECT_TRUNCLOCK == 0x400ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_TRUNCLOCK);
LASSERTF(OBD_CONNECT_TRANSNO == 0x800ULL, "found 0x%.16llxULL\n",
#include <obd_cksum.h>
#include <lustre_lfsck.h>
#include <lustre_nodemap.h>
#include <obd_cksum.h>
#include <lustre_lfsck.h>
#include <lustre_nodemap.h>
#include "tgt_internal.h"
#include "tgt_internal.h"
RCL_SERVER))
req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
RCL_SERVER))
req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
+ if (req_capsule_has_field(tsi->tsi_pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(tsi->tsi_pill,
+ &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
rc = req_capsule_server_pack(tsi->tsi_pill);
}
rc = req_capsule_server_pack(tsi->tsi_pill);
}
-test_48() { # bug 17636
- reformat
+test_48() { # bz-17636 LU-7473
+ local count
+
setup_noconfig
check_mount || error "check_mount failed"
setup_noconfig
check_mount || error "check_mount failed"
$GETSTRIPE $MOUNT/widestripe ||
error "$GETSTRIPE $MOUNT/widestripe failed"
$GETSTRIPE $MOUNT/widestripe ||
error "$GETSTRIPE $MOUNT/widestripe failed"
- trap cleanup_48 EXIT ERR
+ # In the future, we may introduce more EAs, such as selinux, enlarged
+ # LOV EA, and so on. These EA will use some EA space that is shared by
+ # ACL entries. So here we only check some reasonable ACL entries count,
+ # instead of the max number that is calculated from the max_ea_size.
+ if [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.8.57) ];
+ then
+ count=28 # hard coded of RPC protocol
+ elif [ $(facet_fstype $SINGLEMDS) != ldiskfs ]; then
+ count=4000 # max_num 4091 max_ea_size = 32768
+ elif ! large_xattr_enabled; then
+ count=450 # max_num 497 max_ea_size = 4012
+ else
+ count=4500 # max_num 8187 max_ea_size = 1048492
+ # not create too much (>5000) to save test time
+ fi
- # fill acl buffer for avoid expand lsm to them
- getent passwd | awk -F : '{ print "u:"$1":rwx" }' | while read acl; do
- setfacl -m $acl $MOUNT/widestripe
+ echo "It is expected to hold at least $count ACL entries"
+ trap cleanup_48 EXIT ERR
+ for ((i = 0; i < $count; i++)) do
+ setfacl -m u:$((i + 100)):rw $MOUNT/widestripe ||
+ error "Fail to setfacl for $MOUNT/widestripe at $i"
stat $MOUNT/widestripe || error "stat $MOUNT/widestripe failed"
stat $MOUNT/widestripe || error "stat $MOUNT/widestripe failed"
+ local r_count=$(getfacl $MOUNT/widestripe | grep "user:" | wc -l)
+ count=$((count + 1)) # for the entry "user::rw-"
+
+ [ $count -eq $r_count ] ||
+ error "Expected ACL entries $count, but got $r_count"
CHECK_DEFINE_64X(OBD_CONNECT_REQPORTAL);
CHECK_DEFINE_64X(OBD_CONNECT_ACL);
CHECK_DEFINE_64X(OBD_CONNECT_XATTR);
CHECK_DEFINE_64X(OBD_CONNECT_REQPORTAL);
CHECK_DEFINE_64X(OBD_CONNECT_ACL);
CHECK_DEFINE_64X(OBD_CONNECT_XATTR);
- CHECK_DEFINE_64X(OBD_CONNECT_CROW);
+ CHECK_DEFINE_64X(OBD_CONNECT_LARGE_ACL);
CHECK_DEFINE_64X(OBD_CONNECT_TRUNCLOCK);
CHECK_DEFINE_64X(OBD_CONNECT_TRANSNO);
CHECK_DEFINE_64X(OBD_CONNECT_IBITS);
CHECK_DEFINE_64X(OBD_CONNECT_TRUNCLOCK);
CHECK_DEFINE_64X(OBD_CONNECT_TRANSNO);
CHECK_DEFINE_64X(OBD_CONNECT_IBITS);
OBD_CONNECT_ACL);
LASSERTF(OBD_CONNECT_XATTR == 0x100ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_XATTR);
OBD_CONNECT_ACL);
LASSERTF(OBD_CONNECT_XATTR == 0x100ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_XATTR);
- LASSERTF(OBD_CONNECT_CROW == 0x200ULL, "found 0x%.16llxULL\n",
- OBD_CONNECT_CROW);
+ LASSERTF(OBD_CONNECT_LARGE_ACL == 0x200ULL, "found 0x%.16llxULL\n",
+ OBD_CONNECT_LARGE_ACL);
LASSERTF(OBD_CONNECT_TRUNCLOCK == 0x400ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_TRUNCLOCK);
LASSERTF(OBD_CONNECT_TRANSNO == 0x800ULL, "found 0x%.16llxULL\n",
LASSERTF(OBD_CONNECT_TRUNCLOCK == 0x400ULL, "found 0x%.16llxULL\n",
OBD_CONNECT_TRUNCLOCK);
LASSERTF(OBD_CONNECT_TRANSNO == 0x800ULL, "found 0x%.16llxULL\n",