static int
ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
- struct ll_grouplock grouplock;
- int rc;
- ENTRY;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct cl_object *obj = lli->lli_clob;
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_grouplock grouplock;
+ int rc;
+ ENTRY;
if (arg == 0) {
CWARN("group id for group lock must not be 0\n");
LASSERT(fd->fd_grouplock.lg_lock == NULL);
spin_unlock(&lli->lli_lock);
+ /**
+ * XXX: group lock needs to protect all OST objects while PFL
+ * can add new OST objects during the IO, so we'd instantiate
+ * all OST objects before getting its group lock.
+ */
+ if (obj) {
+ struct lu_env *env;
+ __u16 refcheck;
+ struct cl_layout cl = {
+ .cl_is_composite = false,
+ };
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ rc = cl_object_layout_get(env, obj, &cl);
+ if (!rc && cl.cl_is_composite)
+ rc = ll_layout_write_intent(inode, 0, OBD_OBJECT_EOF);
+
+ cl_env_put(env, &refcheck);
+ if (rc)
+ RETURN(rc);
+ }
+
rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
arg, (file->f_flags & O_NONBLOCK), &grouplock);
if (rc)
/* Make sure defined layout covers the requested write range. */
lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
- if ((lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
- lod_comp->llc_extent.e_end < layout->li_end)) {
+ if (lo->ldo_comp_cnt > 1 &&
+ lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
+ lod_comp->llc_extent.e_end < layout->li_end) {
CDEBUG(replay ? D_ERROR : D_LAYOUT,
"%s: the defined layout [0, %#llx) does not covers "
"the write range [%#llx, %#llx).\n",
}
+ if (dest_gid != 0) {
+ rc = llapi_group_unlock(fd_out, dest_gid);
+ ASSERTF(rc == 0, "cannot clear group lock %d for '%s': %s",
+ dest_gid, dest, strerror(-rc));
+ }
+ if (source_gid != 0) {
+ rc = llapi_group_unlock(fd_in, source_gid);
+ ASSERTF(rc == 0, "cannot clear group lock %d for '%s': %s",
+ source_gid, source, strerror(-rc));
+ }
+
close(fd_out);
close(fd_in);