Whamcloud - gitweb
LU-3095 build: fix 'memory corruption' errors
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
index 9df10ed..c3805f9 100644 (file)
@@ -90,8 +90,8 @@ static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
        int                     rc;
        ENTRY;
 
-       rc = osp_prep_update_req(env, osp, update->ur_buf,
-                                UPDATE_BUFFER_SIZE, &req);
+       rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
+                                &req);
        if (rc)
                RETURN(rc);
 
@@ -128,7 +128,7 @@ static struct update_request
        if (!update)
                return ERR_PTR(-ENOMEM);
 
-       OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE);
+       OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
        if (update->ur_buf == NULL) {
                OBD_FREE_PTR(update);
                return ERR_PTR(-ENOMEM);
@@ -136,7 +136,7 @@ static struct update_request
 
        CFS_INIT_LIST_HEAD(&update->ur_list);
        update->ur_dt = dt;
-
+       update->ur_batchid = 0;
        update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
        update->ur_buf->ub_count = 0;
 
@@ -150,7 +150,7 @@ static void osp_destroy_update_req(struct update_request *update)
 
        cfs_list_del(&update->ur_list);
        if (update->ur_buf != NULL)
-               OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE);
+               OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
 
        OBD_FREE_PTR(update);
        return;
@@ -160,8 +160,8 @@ int osp_trans_stop(const struct lu_env *env, struct thandle *th)
 {
        int rc = 0;
 
-       osp_destroy_update_req(th->th_current_request);
        rc = th->th_current_request->ur_rc;
+       osp_destroy_update_req(th->th_current_request);
        th->th_current_request = NULL;
 
        return rc;
@@ -197,8 +197,8 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
  */
 static int osp_insert_update(const struct lu_env *env,
                             struct update_request *update, int op,
-                            struct lu_fid *fid, int count, int *lens,
-                            char **bufs)
+                            struct lu_fid *fid, int count,
+                            int *lens, char **bufs)
 {
        struct update_buf    *ubuf = update->ur_buf;
        struct update        *obj_update;
@@ -236,6 +236,7 @@ static int osp_insert_update(const struct lu_env *env,
        /* fill the update into the update buffer */
        fid_cpu_to_le(&obj_update->u_fid, fid);
        obj_update->u_type = cpu_to_le32(op);
+       obj_update->u_batchid = update->ur_batchid;
        for (i = 0; i < count; i++)
                obj_update->u_lens[i] = cpu_to_le32(lens[i]);
 
@@ -270,6 +271,11 @@ static struct update_request
        return NULL;
 }
 
+static inline void osp_md_add_update_batchid(struct update_request *update)
+{
+       update->ur_batchid++;
+}
+
 /**
  * Find one loc in th_dev/dev_obj_update for the update,
  * Because only one thread can access this thandle, no need
@@ -316,7 +322,7 @@ static int osp_get_attr_from_req(const struct lu_env *env,
                return -EPROTO;
 
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        return 0;
@@ -337,6 +343,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        int                     buf_count;
        int                     rc;
 
+
        update = osp_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
@@ -348,7 +355,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        osi->osi_obdo.o_valid = 0;
        LASSERT(S_ISDIR(attr->la_mode));
        obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
-       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
        obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
 
        bufs[0] = (char *)&osi->osi_obdo;
@@ -365,12 +372,53 @@ static int osp_md_declare_object_create(const struct lu_env *env,
                buf_count++;
        }
 
+       if (lu_object_exists(&dt->do_lu)) {
+               /* If the object already exists, we needs to destroy
+                * this orphan object first.
+                *
+                * The scenario might happen in this case
+                *
+                * 1. client send remote create to MDT0.
+                * 2. MDT0 send create update to MDT1.
+                * 3. MDT1 finished create synchronously.
+                * 4. MDT0 failed and reboot.
+                * 5. client resend remote create to MDT0.
+                * 6. MDT0 tries to resend create update to MDT1,
+                *    but find the object already exists
+                */
+               CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
+
+               rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+                                      NULL, NULL);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
+                       /* decrease for ".." */
+                       rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+                                              0, NULL, NULL);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+
+               rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+                                      NULL);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+               /* Increase batchid to add this orphan object deletion
+                * to separate transaction */
+               osp_md_add_update_batchid(update);
+       }
+
        rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
                               bufs);
-       if (rc) {
+out:
+       if (rc)
                CERROR("%s: Insert update error: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name, rc);
-       }
 
        return rc;
 }
@@ -497,7 +545,7 @@ static int osp_md_declare_attr_set(const struct lu_env *env,
        LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
        obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
                     attr->la_valid);
-       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
        obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
 
        buf = (char *)&osi->osi_obdo;
@@ -531,6 +579,7 @@ static int osp_md_declare_xattr_set(const struct lu_env *env,
        char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
        int                     rc;
 
+       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
        update = osp_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
@@ -613,8 +662,9 @@ static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(size > 0 && size < CFS_PAGE_SIZE);
        LASSERT(ea_buf != NULL);
 
-       buf->lb_len = size;
-       memcpy(buf->lb_buf, ea_buf, size);
+       rc = size;
+       if (buf->lb_buf != NULL)
+               memcpy(buf->lb_buf, ea_buf, size);
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
@@ -884,9 +934,11 @@ static void osp_it_put(const struct lu_env *env, struct dt_it *di)
 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
 {
        struct dt_object *dt = (struct dt_object *)di;
-       struct osp_object *osp_obj = dt2osp_obj(dt);
-       if (osp_obj->opo_empty)
+       struct osp_object *o = dt2osp_obj(dt);
+
+       if (o->opo_empty)
                return 1;
+
        return 0;
 }
 
@@ -1045,6 +1097,44 @@ static int osp_md_object_destroy(const struct lu_env *env,
        RETURN(rc);
 }
 
+static int osp_md_object_lock(const struct lu_env *env,
+                             struct dt_object *dt,
+                             struct lustre_handle *lh,
+                             struct ldlm_enqueue_info *einfo,
+                             void *policy)
+{
+       struct osp_thread_info *info = osp_env_info(env);
+       struct ldlm_res_id     *res_id = &info->osi_resid;
+       struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct osp_device      *osp = dt2osp_dev(dt_dev);
+       struct ptlrpc_request  *req = NULL;
+       int                     rc = 0;
+       __u64                   flags = 0;
+       ldlm_mode_t             mode;
+
+       fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
+
+       mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
+                              LDLM_FL_BLOCK_GRANTED, res_id,
+                              einfo->ei_type,
+                              (ldlm_policy_data_t *)policy,
+                              einfo->ei_mode, lh, 0);
+       if (mode > 0)
+               return ELDLM_OK;
+
+       req = ldlm_enqueue_pack(osp->opd_exp, 0);
+       if (IS_ERR(req))
+               RETURN(PTR_ERR(req));
+
+       rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
+                             (const ldlm_policy_data_t *)policy,
+                             &flags, NULL, 0, LVB_T_NONE, lh, 0);
+
+       ptlrpc_req_finished(req);
+
+       return rc == ELDLM_OK ? 0 : -EIO;
+}
+
 struct dt_object_operations osp_md_obj_ops = {
        .do_read_lock         = osp_md_object_read_lock,
        .do_write_lock        = osp_md_object_write_lock,
@@ -1067,5 +1157,5 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_xattr_set         = osp_md_xattr_set,
        .do_xattr_get         = osp_md_xattr_get,
        .do_index_try         = osp_md_index_try,
+       .do_object_lock       = osp_md_object_lock,
 };
-