int llog_initiator_connect(struct llog_ctxt *ctxt);
struct llog_operations {
+ int (*lop_declare_destroy)(const struct lu_env *env,
+ struct llog_handle *handle, struct thandle *th);
int (*lop_destroy)(const struct lu_env *env,
- struct llog_handle *handle);
+ struct llog_handle *handle, struct thandle *th);
int (*lop_next_block)(const struct lu_env *env, struct llog_handle *h,
int *curr_idx, int next_idx, __u64 *offset,
void *buf, int len);
return (llog_group_ctxt_null(&obd->obd_olg, index));
}
-static inline int llog_destroy(const struct lu_env *env,
- struct llog_handle *handle)
-{
- struct llog_operations *lop;
- int rc;
-
- ENTRY;
-
- rc = llog_handle2ops(handle, &lop);
- if (rc)
- RETURN(rc);
- if (lop->lop_destroy == NULL)
- RETURN(-EOPNOTSUPP);
-
- rc = lop->lop_destroy(env, handle);
- RETURN(rc);
-}
-
static inline int llog_next_block(const struct lu_env *env,
struct llog_handle *loghandle, int *cur_idx,
int next_idx, __u64 *cur_offset, void *buf,
struct llog_handle *loghandle, struct thandle *th);
int llog_create(const struct lu_env *env, struct llog_handle *handle,
struct thandle *th);
+int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
+ struct thandle *th);
+int llog_destroy(const struct lu_env *env, struct llog_handle *handle);
+
int llog_declare_write_rec(const struct lu_env *env,
struct llog_handle *handle,
struct llog_rec_hdr *rec, int idx,
llog_free_handle(loghandle);
}
-static int llog_cancel_rec_internal(const struct lu_env *env,
- struct llog_handle *loghandle, int index)
+static int llog_declare_destroy(const struct lu_env *env,
+ struct llog_handle *handle,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_declare_destroy == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_declare_destroy(env, handle, th);
+
+ RETURN(rc);
+}
+
+int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
+ struct thandle *th)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc < 0)
+ RETURN(rc);
+ if (lop->lop_destroy == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ LASSERT(handle->lgh_obj != NULL);
+ if (!dt_object_exists(handle->lgh_obj))
+ RETURN(0);
+
+ rc = lop->lop_destroy(env, handle, th);
+
+ RETURN(rc);
+}
+
+int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
+{
+ struct llog_operations *lop;
+ struct dt_device *dt;
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ rc = llog_handle2ops(handle, &lop);
+ if (rc < 0)
+ RETURN(rc);
+ if (lop->lop_destroy == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ if (handle->lgh_obj == NULL) {
+ /* if lgh_obj == NULL, then it is from client side destroy */
+ rc = lop->lop_destroy(env, handle, NULL);
+ RETURN(rc);
+ }
+
+ if (!dt_object_exists(handle->lgh_obj))
+ RETURN(0);
+
+ dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = llog_declare_destroy(env, handle, th);
+ if (rc != 0)
+ GOTO(out_trans, rc);
+
+ rc = dt_trans_start_local(env, dt, th);
+ if (rc < 0)
+ GOTO(out_trans, rc);
+
+ rc = lop->lop_destroy(env, handle, th);
+
+out_trans:
+ dt_trans_stop(env, dt, th);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_destroy);
+
+/* returns negative on error; 0 if success; 1 if success & log destroyed */
+int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
+ int index)
{
struct dt_device *dt;
struct llog_log_hdr *llh = loghandle->lgh_hdr;
ENTRY;
- LASSERT(loghandle);
- LASSERT(loghandle->lgh_ctxt);
+ CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", index,
+ POSTID(&loghandle->lgh_id.lgl_oi));
+
+ if (index == 0) {
+ CERROR("Can't cancel index 0 which is header\n");
+ RETURN(-EINVAL);
+ }
+
+ LASSERT(loghandle != NULL);
+ LASSERT(loghandle->lgh_ctxt != NULL);
LASSERT(loghandle->lgh_obj != NULL);
dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
if (rc < 0)
GOTO(out_trans, rc);
+ if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY))
+ rc = llog_declare_destroy(env, loghandle, th);
+
th->th_wait_submit = 1;
rc = dt_trans_start_local(env, dt, th);
if (rc < 0)
loghandle->lgh_hdr->llh_count--;
else
ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
-out_unlock:
- mutex_unlock(&loghandle->lgh_hdr_mutex);
- up_write(&loghandle->lgh_lock);
-out_trans:
- dt_trans_stop(env, dt, th);
- RETURN(rc);
-}
-
-/* returns negative on error; 0 if success; 1 if success & log destroyed */
-int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
- int index)
-{
- struct llog_log_hdr *llh = loghandle->lgh_hdr;
- int rc = 0;
- ENTRY;
-
- CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n",
- index, POSTID(&loghandle->lgh_id.lgl_oi));
-
- if (index == 0) {
- CERROR("Can't cancel index 0 which is header\n");
- RETURN(-EINVAL);
- }
-
- rc = llog_cancel_rec_internal(env, loghandle, index);
- if (rc < 0) {
- CERROR("%s: fail to write header for llog #"DOSTID
- "#%08x: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- POSTID(&loghandle->lgh_id.lgl_oi),
- loghandle->lgh_id.lgl_ogen, rc);
- RETURN(rc);
- }
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
- rc = llog_destroy(env, loghandle);
+ rc = llog_trans_destroy(env, loghandle, th);
if (rc < 0) {
/* Sigh, can not destroy the final plain llog, but
* the bitmap has been clearly, so the record can not
loghandle->lgh_ctxt->loc_obd->obd_name,
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, rc);
- RETURN(0);
+ GOTO(out_unlock, rc = 0);
}
- RETURN(LLOG_DEL_PLAIN);
+ rc = LLOG_DEL_PLAIN;
}
- RETURN(0);
+out_unlock:
+ mutex_unlock(&loghandle->lgh_hdr_mutex);
+ up_write(&loghandle->lgh_lock);
+out_trans:
+ dt_trans_stop(env, dt, th);
+ RETURN(rc);
}
static int llog_read_header(const struct lu_env *env,
if (idx == LLOG_HEADER_IDX) {
/* llog header update */
- LASSERT(reclen >= sizeof(struct llog_log_hdr));
- LASSERT(rec == &llh->llh_hdr);
-
lgi->lgi_off = 0;
lgi->lgi_buf.lb_len = reclen;
lgi->lgi_buf.lb_buf = rec;
RETURN(rc);
}
-
/**
- * Implementation of the llog_operations::lop_destroy
+ * Implementation of the llog_operations::lop_declare_destroy
*
- * This function destroys the llog and deletes also entry in the
+ * This function declare destroys the llog and deletes also entry in the
* llog directory in case of named llog. Llog should be opened prior that.
- * Destroy method is not part of external transaction and does everything
- * inside.
*
* \param[in] env execution environment
* \param[in] loghandle llog handle of the current llog
* \retval 0 on successful destroy
* \retval negative value on error
*/
-static int llog_osd_destroy(const struct lu_env *env,
- struct llog_handle *loghandle)
+static int llog_osd_declare_destroy(const struct lu_env *env,
+ struct llog_handle *loghandle,
+ struct thandle *th)
{
struct llog_ctxt *ctxt;
struct dt_object *o, *llog_dir = NULL;
- struct dt_device *d;
- struct thandle *th;
- char *name = NULL;
int rc;
ENTRY;
o = loghandle->lgh_obj;
LASSERT(o);
- d = lu2dt_dev(o->do_lu.lo_dev);
- LASSERT(d);
- LASSERT(d == ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt);
-
- th = dt_trans_create(env, d);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
if (loghandle->lgh_name) {
llog_dir = llog_osd_dir_get(env, ctxt);
if (IS_ERR(llog_dir))
- GOTO(out_trans, rc = PTR_ERR(llog_dir));
+ RETURN(PTR_ERR(llog_dir));
- name = loghandle->lgh_name;
rc = dt_declare_delete(env, llog_dir,
- (struct dt_key *)name, th);
- if (rc)
- GOTO(out_trans, rc);
+ (struct dt_key *)loghandle->lgh_name,
+ th);
+ if (rc < 0)
+ GOTO(out_put, rc);
}
rc = dt_declare_ref_del(env, o, th);
if (rc < 0)
- GOTO(out_trans, rc);
+ GOTO(out_put, rc);
rc = dt_declare_destroy(env, o, th);
- if (rc)
- GOTO(out_trans, rc);
+ if (rc < 0)
+ GOTO(out_put, rc);
if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
if (rc < 0)
- GOTO(out_trans, rc);
+ GOTO(out_put, rc);
}
- rc = dt_trans_start_local(env, d, th);
- if (rc)
- GOTO(out_trans, rc);
+out_put:
+ if (!(IS_ERR_OR_NULL(llog_dir)))
+ lu_object_put(env, &llog_dir->do_lu);
- th->th_wait_submit = 1;
+ RETURN(rc);
+}
+
+
+/**
+ * Implementation of the llog_operations::lop_destroy
+ *
+ * This function destroys the llog and deletes also entry in the
+ * llog directory in case of named llog. Llog should be opened prior that.
+ * Destroy method is not part of external transaction and does everything
+ * inside.
+ *
+ * \param[in] env execution environment
+ * \param[in] loghandle llog handle of the current llog
+ *
+ * \retval 0 on successful destroy
+ * \retval negative value on error
+ */
+static int llog_osd_destroy(const struct lu_env *env,
+ struct llog_handle *loghandle, struct thandle *th)
+{
+ struct llog_ctxt *ctxt;
+ struct dt_object *o, *llog_dir = NULL;
+ int rc;
+
+ ENTRY;
+
+ ctxt = loghandle->lgh_ctxt;
+ LASSERT(ctxt != NULL);
+
+ o = loghandle->lgh_obj;
+ LASSERT(o != NULL);
dt_write_lock(env, o, 0);
- if (dt_object_exists(o)) {
- if (name) {
- dt_read_lock(env, llog_dir, 0);
- rc = dt_delete(env, llog_dir,
- (struct dt_key *) name,
- th);
- dt_read_unlock(env, llog_dir);
- if (rc) {
- CERROR("%s: can't remove llog %s: rc = %d\n",
- o->do_lu.lo_dev->ld_obd->obd_name,
- name, rc);
- GOTO(out_unlock, rc);
- }
- }
- dt_ref_del(env, o, th);
- rc = dt_destroy(env, o, th);
- if (rc)
- GOTO(out_unlock, rc);
+ if (!dt_object_exists(o))
+ GOTO(out_unlock, rc = 0);
- if (loghandle->lgh_ctxt->loc_flags &
- LLOG_CTXT_FLAG_NORMAL_FID) {
- rc = llog_osd_regular_fid_del_name_entry(env, o, th,
- false);
- if (rc < 0)
- GOTO(out_unlock, rc);
+ if (loghandle->lgh_name) {
+ llog_dir = llog_osd_dir_get(env, ctxt);
+ if (IS_ERR(llog_dir))
+ RETURN(PTR_ERR(llog_dir));
+
+ dt_read_lock(env, llog_dir, 0);
+ rc = dt_delete(env, llog_dir,
+ (struct dt_key *)loghandle->lgh_name,
+ th);
+ dt_read_unlock(env, llog_dir);
+ if (rc) {
+ CERROR("%s: can't remove llog %s: rc = %d\n",
+ o->do_lu.lo_dev->ld_obd->obd_name,
+ loghandle->lgh_name, rc);
+ GOTO(out_unlock, rc);
}
}
+
+ dt_ref_del(env, o, th);
+ rc = dt_destroy(env, o, th);
+ if (rc < 0)
+ GOTO(out_unlock, rc);
+
+ if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+ rc = llog_osd_regular_fid_del_name_entry(env, o, th, false);
+ if (rc < 0)
+ GOTO(out_unlock, rc);
+ }
+
out_unlock:
dt_write_unlock(env, o);
-out_trans:
- dt_trans_stop(env, d, th);
if (!(IS_ERR_OR_NULL(llog_dir)))
lu_object_put(env, &llog_dir->do_lu);
RETURN(rc);
.lop_next_block = llog_osd_next_block,
.lop_prev_block = llog_osd_prev_block,
.lop_read_header = llog_osd_read_header,
+ .lop_declare_destroy = llog_osd_declare_destroy,
.lop_destroy = llog_osd_destroy,
.lop_setup = llog_osd_setup,
.lop_cleanup = llog_osd_cleanup,
.lop_next_block = llog_osd_next_block,
.lop_prev_block = llog_osd_prev_block,
.lop_read_header = llog_osd_read_header,
+ .lop_declare_destroy = llog_osd_declare_destroy,
.lop_destroy = llog_osd_destroy,
.lop_setup = llog_osd_setup,
.lop_cleanup = llog_osd_cleanup,
}
/**
- * Interpreter call for object destroy
- *
- * Object destroy interpreter, which will be called after destroying
- * the remote object to set flags and status.
- *
- * \param[in] env execution environment
- * \param[in] reply update reply
- * \param[in] req ptlrpc update request for destroying object
- * \param[in] obj object to be destroyed
- * \param[in] data data used in this function.
- * \param[in] index index(position) of destroy update in the whole
- * updates
- * \param[in] rc update result on the remote MDT.
- *
- * \retval only return 0 for now
- */
-static int osp_md_object_destroy_interpreter(const struct lu_env *env,
- struct object_update_reply *reply,
- struct ptlrpc_request *req,
- struct osp_object *obj,
- void *data, int index, int rc)
-{
- /* not needed in cache any more */
- set_bit(LU_OBJECT_HEARD_BANSHEE,
- &obj->opo_obj.do_lu.lo_header->loh_flags);
- return 0;
-}
-
-/**
* Implement OSP layer dt_object_operations::do_destroy() interface.
*
* Pack the destroy update into the RPC buffer, which will be sent
if (rc != 0)
RETURN(rc);
+ set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
- osp_md_object_destroy_interpreter);
+ NULL);
+
RETURN(rc);
}
}
static int llog_client_destroy(const struct lu_env *env,
- struct llog_handle *loghandle)
+ struct llog_handle *loghandle,
+ struct thandle *th)
{
struct obd_import *imp;
struct ptlrpc_request *req = NULL;