Whamcloud - gitweb
LU-6846 llog: combine cancel rec and destroy 30/15730/7
authorwang di <di.wang@intel.com>
Sat, 25 Jul 2015 16:21:46 +0000 (09:21 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 19 Aug 2015 16:01:50 +0000 (16:01 +0000)
Merge cancel llog record and llog object destroy in
a single transaction. Otherwise it will cause the
race between cancel record and llog object destroy,
especally when deleting remote record, i.e. if delete
record RPC arrives after destroy, then delete record
will not find the object, cause LBUG.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I33773eeb859fffa871ce01689c5bf1b4196a6658
Reviewed-on: http://review.whamcloud.com/15730
Tested-by: Jenkins
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_log.h
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_md_object.c
lustre/ptlrpc/llog_client.c

index 458cc2e..393ce28 100644 (file)
@@ -218,8 +218,10 @@ int llog_catalog_list(const struct lu_env *env, struct dt_device *d,
 int llog_initiator_connect(struct llog_ctxt *ctxt);
 
 struct llog_operations {
+       int (*lop_declare_destroy)(const struct lu_env *env,
+                          struct llog_handle *handle, struct thandle *th);
        int (*lop_destroy)(const struct lu_env *env,
-                          struct llog_handle *handle);
+                          struct llog_handle *handle, struct thandle *th);
        int (*lop_next_block)(const struct lu_env *env, struct llog_handle *h,
                              int *curr_idx, int next_idx, __u64 *offset,
                              void *buf, int len);
@@ -467,24 +469,6 @@ static inline int llog_ctxt_null(struct obd_device *obd, int index)
         return (llog_group_ctxt_null(&obd->obd_olg, index));
 }
 
-static inline int llog_destroy(const struct lu_env *env,
-                              struct llog_handle *handle)
-{
-       struct llog_operations *lop;
-       int rc;
-
-       ENTRY;
-
-       rc = llog_handle2ops(handle, &lop);
-       if (rc)
-               RETURN(rc);
-       if (lop->lop_destroy == NULL)
-               RETURN(-EOPNOTSUPP);
-
-       rc = lop->lop_destroy(env, handle);
-       RETURN(rc);
-}
-
 static inline int llog_next_block(const struct lu_env *env,
                                  struct llog_handle *loghandle, int *cur_idx,
                                  int next_idx, __u64 *cur_offset, void *buf,
@@ -564,6 +548,10 @@ int llog_declare_create(const struct lu_env *env,
                        struct llog_handle *loghandle, struct thandle *th);
 int llog_create(const struct lu_env *env, struct llog_handle *handle,
                struct thandle *th);
+int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
+                      struct thandle *th);
+int llog_destroy(const struct lu_env *env, struct llog_handle *handle);
+
 int llog_declare_write_rec(const struct lu_env *env,
                           struct llog_handle *handle,
                           struct llog_rec_hdr *rec, int idx,
index 334f752..3aeea1d 100644 (file)
@@ -104,8 +104,98 @@ void llog_handle_put(struct llog_handle *loghandle)
                llog_free_handle(loghandle);
 }
 
-static int llog_cancel_rec_internal(const struct lu_env *env,
-                                   struct llog_handle *loghandle, int index)
+static int llog_declare_destroy(const struct lu_env *env,
+                               struct llog_handle *handle,
+                               struct thandle *th)
+{
+       struct llog_operations *lop;
+       int rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc)
+               RETURN(rc);
+       if (lop->lop_declare_destroy == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       rc = lop->lop_declare_destroy(env, handle, th);
+
+       RETURN(rc);
+}
+
+int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
+                      struct thandle *th)
+{
+       struct llog_operations  *lop;
+       int rc;
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc < 0)
+               RETURN(rc);
+       if (lop->lop_destroy == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       LASSERT(handle->lgh_obj != NULL);
+       if (!dt_object_exists(handle->lgh_obj))
+               RETURN(0);
+
+       rc = lop->lop_destroy(env, handle, th);
+
+       RETURN(rc);
+}
+
+int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
+{
+       struct llog_operations  *lop;
+       struct dt_device        *dt;
+       struct thandle          *th;
+       int rc;
+
+       ENTRY;
+
+       rc = llog_handle2ops(handle, &lop);
+       if (rc < 0)
+               RETURN(rc);
+       if (lop->lop_destroy == NULL)
+               RETURN(-EOPNOTSUPP);
+
+       if (handle->lgh_obj == NULL) {
+               /* if lgh_obj == NULL, then it is from client side destroy */
+               rc = lop->lop_destroy(env, handle, NULL);
+               RETURN(rc);
+       }
+
+       if (!dt_object_exists(handle->lgh_obj))
+               RETURN(0);
+
+       dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = llog_declare_destroy(env, handle, th);
+       if (rc != 0)
+               GOTO(out_trans, rc);
+
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc < 0)
+               GOTO(out_trans, rc);
+
+       rc = lop->lop_destroy(env, handle, th);
+
+out_trans:
+       dt_trans_stop(env, dt, th);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(llog_destroy);
+
+/* returns negative on error; 0 if success; 1 if success & log destroyed */
+int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
+                   int index)
 {
        struct dt_device        *dt;
        struct llog_log_hdr     *llh = loghandle->lgh_hdr;
@@ -114,8 +204,16 @@ static int llog_cancel_rec_internal(const struct lu_env *env,
 
        ENTRY;
 
-       LASSERT(loghandle);
-       LASSERT(loghandle->lgh_ctxt);
+       CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", index,
+              POSTID(&loghandle->lgh_id.lgl_oi));
+
+       if (index == 0) {
+               CERROR("Can't cancel index 0 which is header\n");
+               RETURN(-EINVAL);
+       }
+
+       LASSERT(loghandle != NULL);
+       LASSERT(loghandle->lgh_ctxt != NULL);
        LASSERT(loghandle->lgh_obj != NULL);
 
        dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
@@ -128,6 +226,9 @@ static int llog_cancel_rec_internal(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_trans, rc);
 
+       if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY))
+               rc = llog_declare_destroy(env, loghandle, th);
+
        th->th_wait_submit = 1;
        rc = dt_trans_start_local(env, dt, th);
        if (rc < 0)
@@ -147,44 +248,11 @@ static int llog_cancel_rec_internal(const struct lu_env *env,
                loghandle->lgh_hdr->llh_count--;
        else
                ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
-out_unlock:
-       mutex_unlock(&loghandle->lgh_hdr_mutex);
-       up_write(&loghandle->lgh_lock);
-out_trans:
-       dt_trans_stop(env, dt, th);
-       RETURN(rc);
-}
-
-/* returns negative on error; 0 if success; 1 if success & log destroyed */
-int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
-                   int index)
-{
-        struct llog_log_hdr *llh = loghandle->lgh_hdr;
-        int rc = 0;
-        ENTRY;
-
-        CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n",
-               index, POSTID(&loghandle->lgh_id.lgl_oi));
-
-        if (index == 0) {
-                CERROR("Can't cancel index 0 which is header\n");
-                RETURN(-EINVAL);
-        }
-
-       rc = llog_cancel_rec_internal(env, loghandle, index);
-       if (rc < 0) {
-               CERROR("%s: fail to write header for llog #"DOSTID
-                      "#%08x: rc = %d\n",
-                      loghandle->lgh_ctxt->loc_obd->obd_name,
-                      POSTID(&loghandle->lgh_id.lgl_oi),
-                      loghandle->lgh_id.lgl_ogen, rc);
-               RETURN(rc);
-       }
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
            (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
-               rc = llog_destroy(env, loghandle);
+               rc = llog_trans_destroy(env, loghandle, th);
                if (rc < 0) {
                        /* Sigh, can not destroy the final plain llog, but
                         * the bitmap has been clearly, so the record can not
@@ -195,12 +263,17 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                               loghandle->lgh_ctxt->loc_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, rc);
-                       RETURN(0);
+                       GOTO(out_unlock, rc = 0);
                }
-               RETURN(LLOG_DEL_PLAIN);
+               rc = LLOG_DEL_PLAIN;
        }
 
-       RETURN(0);
+out_unlock:
+       mutex_unlock(&loghandle->lgh_hdr_mutex);
+       up_write(&loghandle->lgh_lock);
+out_trans:
+       dt_trans_stop(env, dt, th);
+       RETURN(rc);
 }
 
 static int llog_read_header(const struct lu_env *env,
index 519fd27..4912db9 100644 (file)
@@ -422,9 +422,6 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
                if (idx == LLOG_HEADER_IDX) {
                        /* llog header update */
-                       LASSERT(reclen >= sizeof(struct llog_log_hdr));
-                       LASSERT(rec == &llh->llh_hdr);
-
                        lgi->lgi_off = 0;
                        lgi->lgi_buf.lb_len = reclen;
                        lgi->lgi_buf.lb_buf = rec;
@@ -1482,14 +1479,11 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
        RETURN(rc);
 }
 
-
 /**
- * Implementation of the llog_operations::lop_destroy
+ * Implementation of the llog_operations::lop_declare_destroy
  *
- * This function destroys the llog and deletes also entry in the
+ * This function declare destroys the llog and deletes also entry in the
  * llog directory in case of named llog. Llog should be opened prior that.
- * Destroy method is not part of external transaction and does everything
- * inside.
  *
  * \param[in] env              execution environment
  * \param[in] loghandle        llog handle of the current llog
@@ -1497,14 +1491,12 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
  * \retval             0 on successful destroy
  * \retval             negative value on error
  */
-static int llog_osd_destroy(const struct lu_env *env,
-                           struct llog_handle *loghandle)
+static int llog_osd_declare_destroy(const struct lu_env *env,
+                                   struct llog_handle *loghandle,
+                                   struct thandle *th)
 {
        struct llog_ctxt        *ctxt;
        struct dt_object        *o, *llog_dir = NULL;
-       struct dt_device        *d;
-       struct thandle          *th;
-       char                    *name = NULL;
        int                      rc;
 
        ENTRY;
@@ -1515,78 +1507,104 @@ static int llog_osd_destroy(const struct lu_env *env,
        o = loghandle->lgh_obj;
        LASSERT(o);
 
-       d = lu2dt_dev(o->do_lu.lo_dev);
-       LASSERT(d);
-       LASSERT(d == ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt);
-
-       th = dt_trans_create(env, d);
-       if (IS_ERR(th))
-               RETURN(PTR_ERR(th));
-
        if (loghandle->lgh_name) {
                llog_dir = llog_osd_dir_get(env, ctxt);
                if (IS_ERR(llog_dir))
-                       GOTO(out_trans, rc = PTR_ERR(llog_dir));
+                       RETURN(PTR_ERR(llog_dir));
 
-               name = loghandle->lgh_name;
                rc = dt_declare_delete(env, llog_dir,
-                                      (struct dt_key *)name, th);
-               if (rc)
-                       GOTO(out_trans, rc);
+                                      (struct dt_key *)loghandle->lgh_name,
+                                      th);
+               if (rc < 0)
+                       GOTO(out_put, rc);
        }
 
        rc = dt_declare_ref_del(env, o, th);
        if (rc < 0)
-               GOTO(out_trans, rc);
+               GOTO(out_put, rc);
 
        rc = dt_declare_destroy(env, o, th);
-       if (rc)
-               GOTO(out_trans, rc);
+       if (rc < 0)
+               GOTO(out_put, rc);
 
        if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
                rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
                if (rc < 0)
-                       GOTO(out_trans, rc);
+                       GOTO(out_put, rc);
        }
 
-       rc = dt_trans_start_local(env, d, th);
-       if (rc)
-               GOTO(out_trans, rc);
+out_put:
+       if (!(IS_ERR_OR_NULL(llog_dir)))
+               lu_object_put(env, &llog_dir->do_lu);
 
-       th->th_wait_submit = 1;
+       RETURN(rc);
+}
+
+
+/**
+ * Implementation of the llog_operations::lop_destroy
+ *
+ * This function destroys the llog and deletes also entry in the
+ * llog directory in case of named llog. Llog should be opened prior that.
+ * Destroy method is not part of external transaction and does everything
+ * inside.
+ *
+ * \param[in] env              execution environment
+ * \param[in] loghandle        llog handle of the current llog
+ *
+ * \retval             0 on successful destroy
+ * \retval             negative value on error
+ */
+static int llog_osd_destroy(const struct lu_env *env,
+                           struct llog_handle *loghandle, struct thandle *th)
+{
+       struct llog_ctxt        *ctxt;
+       struct dt_object        *o, *llog_dir = NULL;
+       int                      rc;
+
+       ENTRY;
+
+       ctxt = loghandle->lgh_ctxt;
+       LASSERT(ctxt != NULL);
+
+       o = loghandle->lgh_obj;
+       LASSERT(o != NULL);
 
        dt_write_lock(env, o, 0);
-       if (dt_object_exists(o)) {
-               if (name) {
-                       dt_read_lock(env, llog_dir, 0);
-                       rc = dt_delete(env, llog_dir,
-                                      (struct dt_key *) name,
-                                      th);
-                       dt_read_unlock(env, llog_dir);
-                       if (rc) {
-                               CERROR("%s: can't remove llog %s: rc = %d\n",
-                                      o->do_lu.lo_dev->ld_obd->obd_name,
-                                      name, rc);
-                               GOTO(out_unlock, rc);
-                       }
-               }
-               dt_ref_del(env, o, th);
-               rc = dt_destroy(env, o, th);
-               if (rc)
-                       GOTO(out_unlock, rc);
+       if (!dt_object_exists(o))
+               GOTO(out_unlock, rc = 0);
 
-               if (loghandle->lgh_ctxt->loc_flags &
-                                               LLOG_CTXT_FLAG_NORMAL_FID) {
-                       rc = llog_osd_regular_fid_del_name_entry(env, o, th,
-                                                                false);
-                       if (rc < 0)
-                               GOTO(out_unlock, rc);
+       if (loghandle->lgh_name) {
+               llog_dir = llog_osd_dir_get(env, ctxt);
+               if (IS_ERR(llog_dir))
+                       RETURN(PTR_ERR(llog_dir));
+
+               dt_read_lock(env, llog_dir, 0);
+               rc = dt_delete(env, llog_dir,
+                              (struct dt_key *)loghandle->lgh_name,
+                              th);
+               dt_read_unlock(env, llog_dir);
+               if (rc) {
+                       CERROR("%s: can't remove llog %s: rc = %d\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              loghandle->lgh_name, rc);
+                       GOTO(out_unlock, rc);
                }
        }
+
+       dt_ref_del(env, o, th);
+       rc = dt_destroy(env, o, th);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
+               if (rc < 0)
+                       GOTO(out_unlock, rc);
+       }
+
 out_unlock:
        dt_write_unlock(env, o);
-out_trans:
-       dt_trans_stop(env, d, th);
        if (!(IS_ERR_OR_NULL(llog_dir)))
                lu_object_put(env, &llog_dir->do_lu);
        RETURN(rc);
@@ -1683,6 +1701,7 @@ struct llog_operations llog_osd_ops = {
        .lop_next_block         = llog_osd_next_block,
        .lop_prev_block         = llog_osd_prev_block,
        .lop_read_header        = llog_osd_read_header,
+       .lop_declare_destroy    = llog_osd_declare_destroy,
        .lop_destroy            = llog_osd_destroy,
        .lop_setup              = llog_osd_setup,
        .lop_cleanup            = llog_osd_cleanup,
@@ -1700,6 +1719,7 @@ struct llog_operations llog_common_cat_ops = {
        .lop_next_block         = llog_osd_next_block,
        .lop_prev_block         = llog_osd_prev_block,
        .lop_read_header        = llog_osd_read_header,
+       .lop_declare_destroy    = llog_osd_declare_destroy,
        .lop_destroy            = llog_osd_destroy,
        .lop_setup              = llog_osd_setup,
        .lop_cleanup            = llog_osd_cleanup,
index 6738763..af49456 100644 (file)
@@ -956,35 +956,6 @@ int osp_md_declare_object_destroy(const struct lu_env *env,
 }
 
 /**
- * Interpreter call for object destroy
- *
- * Object destroy interpreter, which will be called after destroying
- * the remote object to set flags and status.
- *
- * \param[in] env      execution environment
- * \param[in] reply    update reply
- * \param[in] req      ptlrpc update request for destroying object
- * \param[in] obj      object to be destroyed
- * \param[in] data     data used in this function.
- * \param[in] index    index(position) of destroy update in the whole
- *                      updates
- * \param[in] rc       update result on the remote MDT.
- *
- * \retval             only return 0 for now
- */
-static int osp_md_object_destroy_interpreter(const struct lu_env *env,
-                                            struct object_update_reply *reply,
-                                            struct ptlrpc_request *req,
-                                            struct osp_object *obj,
-                                            void *data, int index, int rc)
-{
-       /* not needed in cache any more */
-       set_bit(LU_OBJECT_HEARD_BANSHEE,
-               &obj->opo_obj.do_lu.lo_header->loh_flags);
-       return 0;
-}
-
-/**
  * Implement OSP layer dt_object_operations::do_destroy() interface.
  *
  * Pack the destroy update into the RPC buffer, which will be sent
@@ -1019,8 +990,10 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                RETURN(rc);
 
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
        rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
-                                       osp_md_object_destroy_interpreter);
+                                       NULL);
+
        RETURN(rc);
 }
 
index e618714..4860542 100644 (file)
@@ -141,7 +141,8 @@ out:
 }
 
 static int llog_client_destroy(const struct lu_env *env,
-                              struct llog_handle *loghandle)
+                              struct llog_handle *loghandle,
+                              struct thandle *th)
 {
         struct obd_import     *imp;
         struct ptlrpc_request *req = NULL;