* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* value which shouldn't be exceeded.
*
* \param[in] ofd OFD device
- * \param[in] int number of updates in the batch
+ * \param[in] batch number of updates in the batch
*
* \retval \a batch limited by ofd_device::ofd_precreate_batch
*/
* \retval pointer to the requested ofd_seq structure
* \retval NULL if ofd_seq is not found
*/
-struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq)
+struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, u64 seq)
{
struct ofd_seq *oseq;
if (atomic_dec_and_test(&oseq->os_refc)) {
LASSERT(list_empty(&oseq->os_list));
LASSERT(oseq->os_lastid_obj != NULL);
- lu_object_put(env, &oseq->os_lastid_obj->do_lu);
+ dt_object_put(env, oseq->os_lastid_obj);
OBD_FREE_PTR(oseq);
}
}
/**
* Get last object ID for the given sequence.
*
- * \param[in] ofd_seq OFD sequence structure
+ * \param[in] oseq OFD sequence structure
*
* \retval the last object ID for this sequence
*/
-obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
+u64 ofd_seq_last_oid(struct ofd_seq *oseq)
{
- obd_id id;
+ u64 id;
spin_lock(&oseq->os_last_oid_lock);
id = ostid_id(&oseq->os_oi);
* \param[in] oseq OFD sequence
* \param[in] id the new OID to set
*/
-void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id)
+void ofd_seq_last_oid_set(struct ofd_seq *oseq, u64 id)
{
spin_lock(&oseq->os_last_oid_lock);
- if (likely(ostid_id(&oseq->os_oi) < id))
- ostid_set_id(&oseq->os_oi, id);
+ if (likely(ostid_id(&oseq->os_oi) < id)) {
+ if (ostid_set_id(&oseq->os_oi, id)) {
+ CERROR("Bad %llu to set " DOSTID "\n",
+ (unsigned long long)id, POSTID(&oseq->os_oi));
+ }
+ }
spin_unlock(&oseq->os_last_oid_lock);
}
struct ofd_seq *oseq)
{
struct ofd_thread_info *info = ofd_info(env);
- obd_id tmp;
+ u64 tmp;
struct dt_object *obj = oseq->os_lastid_obj;
struct thandle *th;
int rc;
ENTRY;
+ if (ofd->ofd_osd->dd_rdonly)
+ RETURN(0);
+
tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
info->fti_buf.lb_buf = &tmp;
*
* LWP is lightweight proxy - simplified connection between
* servers. It is used for FID Location Database (FLDB) and
- * sequence (SEQ) client-server interations.
+ * sequence (SEQ) client-server interactions.
*
* This function is used during server cleanup process to free
* LWP items that were previously set up upon OFD start.
*/
void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd)
{
- struct ofd_seq *oseq;
- struct ofd_seq *tmp;
- struct list_head dispose;
+ struct ofd_seq *oseq;
+ struct ofd_seq *tmp;
+ LIST_HEAD(dispose);
- INIT_LIST_HEAD(&dispose);
write_lock(&ofd->ofd_seq_list_lock);
list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list)
list_move(&oseq->os_list, &dispose);
write_unlock(&ofd->ofd_seq_list_lock);
while (!list_empty(&dispose)) {
- oseq = container_of0(dispose.next, struct ofd_seq, os_list);
+ oseq = container_of(dispose.next, struct ofd_seq, os_list);
list_del_init(&oseq->os_list);
ofd_seq_put(env, oseq);
}
* \retval ERR_PTR pointer on error
*/
struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
- obd_seq seq)
+ u64 seq)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_seq *oseq = NULL;
struct dt_object *dob;
- obd_id lastid;
+ u64 lastid;
int rc;
ENTRY;
mutex_init(&oseq->os_create_lock);
spin_lock_init(&oseq->os_last_oid_lock);
ostid_set_seq(&oseq->os_oi, seq);
+ oseq->os_last_id_synced = 0;
atomic_set(&oseq->os_refc, 1);
+ atomic_set(&oseq->os_precreate_in_progress, 0);
- rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
+ rc = dt_attr_get(env, dob, &info->fti_attr);
if (rc)
GOTO(cleanup, rc);
if (info->fti_attr.la_size == 0) {
/* object is just created, initialize last id */
- ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OFD_SET_OID))
+ ofd_seq_last_oid_set(oseq, 0xffffff00);
+ else
+ ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
ofd_seq_last_oid_write(env, ofd, oseq);
} else if (info->fti_attr.la_size == sizeof(lastid)) {
info->fti_off = 0;
}
ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
} else {
- CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n",
+ CERROR("%s: corrupted size %llu LAST_ID of seq %#llx\n",
ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
GOTO(cleanup, rc = -EINVAL);
}
{
int rc;
+ rwlock_init(&ofd->ofd_seq_list_lock);
+ INIT_LIST_HEAD(&ofd->ofd_seq_list);
+ ofd->ofd_seq_count = 0;
+
rc = ofd_fid_init(env, ofd);
if (rc != 0) {
CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
- return rc;
+ GOTO(out, rc);
}
rc = ofd_fld_init(env, ofd_name(ofd), ofd);
if (rc < 0) {
CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
- return rc;
+ GOTO(out_fid, rc);
}
rc = ofd_register_seq_exp(ofd);
if (rc < 0) {
CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
- return rc;
+ GOTO(out_fld, rc);
}
- rwlock_init(&ofd->ofd_seq_list_lock);
- INIT_LIST_HEAD(&ofd->ofd_seq_list);
- ofd->ofd_seq_count = 0;
+ RETURN(0);
+
+out_fld:
+ ofd_fld_fini(env, ofd);
+out_fid:
+ ofd_fid_fini(env, ofd);
+out:
return rc;
}
rc = ofd_seqs_init(env, ofd);
if (rc)
- GOTO(out_hc, rc);
+ GOTO(out, rc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
- RETURN (-ENOENT);
+ GOTO(out_seqs, rc = -ENOENT);
lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
memset(&info->fti_attr, 0, sizeof(info->fti_attr));
fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
&info->fti_dof, &info->fti_attr);
if (IS_ERR(fo))
- GOTO(out, rc = PTR_ERR(fo));
+ GOTO(out_seqs, rc = PTR_ERR(fo));
ofd->ofd_health_check_file = fo;
RETURN(0);
-out_hc:
- lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
+
+out_seqs:
+ ofd_seqs_fini(env, ofd);
out:
return rc;
}
ofd_name(ofd), rc);
if (ofd->ofd_health_check_file) {
- lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
+ dt_object_put(env, ofd->ofd_health_check_file);
ofd->ofd_health_check_file = NULL;
}