unsigned int opd_reserved_mb_low;
unsigned int opd_reserved_ino_high;
unsigned int opd_reserved_ino_low;
+
+ wait_queue_head_t opd_out_waitq;
bool opd_cleanup_orphans_done;
bool opd_force_creation;
};
unsigned int opo_reserved:1,
opo_non_exist:1,
opo_stale:1,
- opo_destroyed:1;
+ opo_destroyed:1,
+ opo_creating:1; /* create in progress */
/* read/write lock for md osp object */
struct rw_semaphore opo_sem;
/* to implement in-flight invalidation */
atomic_t opo_invalidate_seq;
struct rw_semaphore opo_invalidate_sem;
+ atomic_t opo_writes_in_flight;
};
extern const struct lu_object_operations osp_lu_obj_ops;
struct osp_object *obj,
void *data, int index, int rc)
{
+ struct osp_device *osp = lu2osp_dev(obj->opo_obj.do_lu.lo_dev);
+
+ spin_lock(&obj->opo_lock);
if (rc != 0 && rc != -EEXIST) {
obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
obj->opo_non_exist = 1;
}
+ obj->opo_creating = 0;
+ spin_unlock(&obj->opo_lock);
/*
* invalidate opo cache for the object after the object is created, so
*/
osp_obj_invalidate_cache(obj);
+ /*
+ * currently reads from objects being created
+ * are exceptional - during recovery only, when
+ * remote llog update fetching can race with
+ * orphan cleanup. so don't waste memory adding
+ * a wait queue to every osp object
+ */
+ wake_up_all(&osp->opd_out_waitq);
+
return 0;
}
if (rc < 0)
GOTO(out, rc);
+ spin_lock(&obj->opo_lock);
+ obj->opo_creating = 1;
dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
dt2osp_obj(dt)->opo_non_exist = 0;
obj->opo_stale = 0;
+ spin_unlock(&obj->opo_lock);
obj->opo_attr = *attr;
out:
struct osp_object *obj,
void *data, int index, int rc)
{
+ struct osp_device *osp = lu2osp_dev(obj->opo_obj.do_lu.lo_dev);
+
if (rc) {
CDEBUG(D_HA, "error "DFID": rc = %d\n",
PFID(lu_object_fid(&obj->opo_obj.do_lu)), rc);
obj->opo_stale = 1;
spin_unlock(&obj->opo_lock);
}
+ if (atomic_dec_and_test(&obj->opo_writes_in_flight))
+ wake_up_all(&osp->opd_out_waitq);
return 0;
}
}
spin_unlock(&obj->opo_lock);
+ atomic_inc(&obj->opo_writes_in_flight);
+
RETURN(buf->lb_len);
}
orr_dst->orr_offset = le64_to_cpu(orr_dst->orr_offset);
}
+static int osp_md_check_creating(struct osp_object *obj)
+{
+ int rc;
+ spin_lock(&obj->opo_lock);
+ rc = obj->opo_creating;
+ spin_unlock(&obj->opo_lock);
+
+ return rc;
+}
static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
struct lu_buf *rbuf, loff_t *pos)
if (dt2osp_obj(dt)->opo_destroyed)
RETURN(-ENOENT);
+ wait_event_idle(osp->opd_out_waitq,
+ !atomic_read(&dt2osp_obj(dt)->opo_writes_in_flight) &&
+ osp_md_check_creating(dt2osp_obj(dt)) == 0);
+
/* Because it needs send the update buffer right away,
* just create an update buffer, instead of attaching the
* update_remote list of the thandle. */