int rc;
ENTRY;
- rc = osp_prep_update_req(env, osp, update->ur_buf,
- UPDATE_BUFFER_SIZE, &req);
+ rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
+ &req);
if (rc)
RETURN(rc);
CFS_INIT_LIST_HEAD(&update->ur_list);
update->ur_dt = dt;
-
+ update->ur_batchid = 0;
update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
update->ur_buf->ub_count = 0;
*/
static int osp_insert_update(const struct lu_env *env,
struct update_request *update, int op,
- struct lu_fid *fid, int count, int *lens,
- char **bufs)
+ struct lu_fid *fid, int count,
+ int *lens, char **bufs)
{
struct update_buf *ubuf = update->ur_buf;
struct update *obj_update;
/* fill the update into the update buffer */
fid_cpu_to_le(&obj_update->u_fid, fid);
obj_update->u_type = cpu_to_le32(op);
+ obj_update->u_batchid = update->ur_batchid;
for (i = 0; i < count; i++)
obj_update->u_lens[i] = cpu_to_le32(lens[i]);
return NULL;
}
+static inline void osp_md_add_update_batchid(struct update_request *update)
+{
+ update->ur_batchid++;
+}
+
/**
* Find one loc in th_dev/dev_obj_update for the update,
* Because only one thread can access this thandle, no need
int buf_count;
int rc;
+
update = osp_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
buf_count++;
}
+ if (lu_object_exists(&dt->do_lu)) {
+ /* If the object already exists, we needs to destroy
+ * this orphan object first.
+ *
+ * The scenario might happen in this case
+ *
+ * 1. client send remote create to MDT0.
+ * 2. MDT0 send create update to MDT1.
+ * 3. MDT1 finished create synchronously.
+ * 4. MDT0 failed and reboot.
+ * 5. client resend remote create to MDT0.
+ * 6. MDT0 tries to resend create update to MDT1,
+ * but find the object already exists
+ */
+ CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
+ dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
+
+ rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+ NULL, NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
+ /* decrease for ".." */
+ rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+ 0, NULL, NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
+
+ rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+ NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+ /* Increase batchid to add this orphan object deletion
+ * to separate transaction */
+ osp_md_add_update_batchid(update);
+ }
+
rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
bufs);
- if (rc) {
+out:
+ if (rc)
CERROR("%s: Insert update error: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name, rc);
- }
return rc;
}
char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
int rc;
+ LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
update = osp_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
LASSERT(size > 0 && size < CFS_PAGE_SIZE);
LASSERT(ea_buf != NULL);
- buf->lb_len = size;
- memcpy(buf->lb_buf, ea_buf, size);
+ rc = size;
+ if (buf->lb_buf != NULL)
+ memcpy(buf->lb_buf, ea_buf, size);
out:
if (req != NULL)
ptlrpc_req_finished(req);
RETURN(rc);
}
-struct dt_object_operations osp_md_obj_ops = {
- .do_read_lock = osp_md_object_read_lock,
- .do_write_lock = osp_md_object_write_lock,
- .do_read_unlock = osp_md_object_read_unlock,
- .do_write_unlock = osp_md_object_write_unlock,
- .do_write_locked = osp_md_object_write_locked,
- .do_declare_create = osp_md_declare_object_create,
- .do_create = osp_md_object_create,
- .do_declare_ref_add = osp_md_declare_ref_add,
- .do_ref_add = osp_md_object_ref_add,
- .do_declare_ref_del = osp_md_declare_object_ref_del,
- .do_ref_del = osp_md_object_ref_del,
- .do_declare_destroy = osp_md_declare_object_destroy,
- .do_destroy = osp_md_object_destroy,
- .do_ah_init = osp_md_ah_init,
- .do_attr_get = osp_md_attr_get,
- .do_declare_attr_set = osp_md_declare_attr_set,
- .do_attr_set = osp_md_attr_set,
- .do_declare_xattr_set = osp_md_declare_xattr_set,
- .do_xattr_set = osp_md_xattr_set,
- .do_xattr_get = osp_md_xattr_get,
- .do_index_try = osp_md_index_try,
-};
-
static int osp_md_object_lock(const struct lu_env *env,
struct dt_object *dt,
struct lustre_handle *lh,
return rc == ELDLM_OK ? 0 : -EIO;
}
-struct dt_lock_operations osp_md_lock_ops = {
+struct dt_object_operations osp_md_obj_ops = {
+ .do_read_lock = osp_md_object_read_lock,
+ .do_write_lock = osp_md_object_write_lock,
+ .do_read_unlock = osp_md_object_read_unlock,
+ .do_write_unlock = osp_md_object_write_unlock,
+ .do_write_locked = osp_md_object_write_locked,
+ .do_declare_create = osp_md_declare_object_create,
+ .do_create = osp_md_object_create,
+ .do_declare_ref_add = osp_md_declare_ref_add,
+ .do_ref_add = osp_md_object_ref_add,
+ .do_declare_ref_del = osp_md_declare_object_ref_del,
+ .do_ref_del = osp_md_object_ref_del,
+ .do_declare_destroy = osp_md_declare_object_destroy,
+ .do_destroy = osp_md_object_destroy,
+ .do_ah_init = osp_md_ah_init,
+ .do_attr_get = osp_md_attr_get,
+ .do_declare_attr_set = osp_md_declare_attr_set,
+ .do_attr_set = osp_md_attr_set,
+ .do_declare_xattr_set = osp_md_declare_xattr_set,
+ .do_xattr_set = osp_md_xattr_set,
+ .do_xattr_get = osp_md_xattr_get,
+ .do_index_try = osp_md_index_try,
.do_object_lock = osp_md_object_lock,
};
-