int rc;
ENTRY;
- rc = osp_prep_update_req(env, osp, update->ur_buf,
- UPDATE_BUFFER_SIZE, &req);
+ rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
+ &req);
if (rc)
RETURN(rc);
if (!update)
return ERR_PTR(-ENOMEM);
- OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE);
+ OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
if (update->ur_buf == NULL) {
OBD_FREE_PTR(update);
return ERR_PTR(-ENOMEM);
CFS_INIT_LIST_HEAD(&update->ur_list);
update->ur_dt = dt;
-
+ update->ur_batchid = 0;
update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
update->ur_buf->ub_count = 0;
cfs_list_del(&update->ur_list);
if (update->ur_buf != NULL)
- OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE);
+ OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
OBD_FREE_PTR(update);
return;
*/
static int osp_insert_update(const struct lu_env *env,
struct update_request *update, int op,
- struct lu_fid *fid, int count, int *lens,
- char **bufs)
+ struct lu_fid *fid, int count,
+ int *lens, char **bufs)
{
struct update_buf *ubuf = update->ur_buf;
struct update *obj_update;
/* fill the update into the update buffer */
fid_cpu_to_le(&obj_update->u_fid, fid);
obj_update->u_type = cpu_to_le32(op);
+ obj_update->u_batchid = update->ur_batchid;
for (i = 0; i < count; i++)
obj_update->u_lens[i] = cpu_to_le32(lens[i]);
return NULL;
}
+static inline void osp_md_add_update_batchid(struct update_request *update)
+{
+ update->ur_batchid++;
+}
+
/**
* Find one loc in th_dev/dev_obj_update for the update,
* Because only one thread can access this thandle, no need
return -EPROTO;
obdo_le_to_cpu(wobdo, wobdo);
- lustre_get_wire_obdo(lobdo, wobdo);
+ lustre_get_wire_obdo(NULL, lobdo, wobdo);
la_from_obdo(attr, lobdo, lobdo->o_valid);
return 0;
int buf_count;
int rc;
+
update = osp_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
osi->osi_obdo.o_valid = 0;
LASSERT(S_ISDIR(attr->la_mode));
obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+ lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
bufs[0] = (char *)&osi->osi_obdo;
buf_count++;
}
+ if (lu_object_exists(&dt->do_lu)) {
+ /* If the object already exists, we needs to destroy
+ * this orphan object first.
+ *
+ * The scenario might happen in this case
+ *
+ * 1. client send remote create to MDT0.
+ * 2. MDT0 send create update to MDT1.
+ * 3. MDT1 finished create synchronously.
+ * 4. MDT0 failed and reboot.
+ * 5. client resend remote create to MDT0.
+ * 6. MDT0 tries to resend create update to MDT1,
+ * but find the object already exists
+ */
+ CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
+ dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
+
+ rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+ NULL, NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
+ /* decrease for ".." */
+ rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+ 0, NULL, NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
+
+ rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+ NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+ /* Increase batchid to add this orphan object deletion
+ * to separate transaction */
+ osp_md_add_update_batchid(update);
+ }
+
rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
bufs);
- if (rc) {
+out:
+ if (rc)
CERROR("%s: Insert update error: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name, rc);
- }
return rc;
}
LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
attr->la_valid);
- lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+ lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
buf = (char *)&osi->osi_obdo;
char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
int rc;
+ LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
update = osp_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
LASSERT(size > 0 && size < CFS_PAGE_SIZE);
LASSERT(ea_buf != NULL);
- buf->lb_len = size;
- memcpy(buf->lb_buf, ea_buf, size);
+ rc = size;
+ if (buf->lb_buf != NULL)
+ memcpy(buf->lb_buf, ea_buf, size);
out:
if (req != NULL)
ptlrpc_req_finished(req);
static int osp_it_next(const struct lu_env *env, struct dt_it *di)
{
struct dt_object *dt = (struct dt_object *)di;
- struct osp_object *osp_obj = dt2osp_obj(dt);
- if (osp_obj->opo_empty)
+ struct osp_object *o = dt2osp_obj(dt);
+
+ if (o->opo_empty)
return 1;
+
return 0;
}
RETURN(rc);
}
+static int osp_md_object_lock(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ void *policy)
+{
+ struct osp_thread_info *info = osp_env_info(env);
+ struct ldlm_res_id *res_id = &info->osi_resid;
+ struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+ struct osp_device *osp = dt2osp_dev(dt_dev);
+ struct ptlrpc_request *req = NULL;
+ int rc = 0;
+ __u64 flags = 0;
+ ldlm_mode_t mode;
+
+ fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
+
+ mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, res_id,
+ einfo->ei_type,
+ (ldlm_policy_data_t *)policy,
+ einfo->ei_mode, lh, 0);
+ if (mode > 0)
+ return ELDLM_OK;
+
+ req = ldlm_enqueue_pack(osp->opd_exp, 0);
+ if (IS_ERR(req))
+ RETURN(PTR_ERR(req));
+
+ rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
+ (const ldlm_policy_data_t *)policy,
+ &flags, NULL, 0, LVB_T_NONE, lh, 0);
+
+ ptlrpc_req_finished(req);
+
+ return rc == ELDLM_OK ? 0 : -EIO;
+}
+
struct dt_object_operations osp_md_obj_ops = {
.do_read_lock = osp_md_object_read_lock,
.do_write_lock = osp_md_object_write_lock,
.do_xattr_set = osp_md_xattr_set,
.do_xattr_get = osp_md_xattr_get,
.do_index_try = osp_md_index_try,
+ .do_object_lock = osp_md_object_lock,
};
-