X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter_log.c;h=3fd9bc85d9a915b99ca69cc18f383abdd8f929b9;hb=fdb061c9ce3fd16e805ff07b1e80c8de46110004;hp=d91e1e853a3ce782143036df41e224ce39bd19f9;hpb=a85b25f04f6ba5bb45c116927c9e5d0e4639dc97;p=fs%2Flustre-release.git diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index d91e1e8..3fd9bc8 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -30,351 +30,163 @@ #include #include -#include +#include #include #include #include #include "filter_internal.h" -static struct llog_handle *filter_log_create(struct obd_device *obd); - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static int filter_log_close(struct llog_handle *cathandle, - struct llog_handle *loghandle) +int filter_log_sz_change(struct obd_device *obd, + struct lustre_id *id, __u32 io_epoch, + struct llog_cookie *logcookie, + struct inode *inode) { - struct llog_object_hdr *llh = loghandle->lgh_hdr; - struct file *file = loghandle->lgh_file; - struct dentry *dparent = NULL, *dchild = NULL; - struct lustre_handle parent_lockh; - struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl; + struct llog_size_change_rec *lsc; + struct ost_filterdata *ofd; + struct llog_ctxt *ctxt; int rc; ENTRY; - /* If we are going to delete this log, grab a ref before we close - * it so we don't have to immediately do another lookup. */ - if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){ - CDEBUG(D_INODE, "deleting log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); - dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG, - lgl->lgl_oid,LCK_PW,&parent_lockh); - if (IS_ERR(dparent)) { - rc = PTR_ERR(dparent); - CERROR("error locking parent, orphan log %*s: rc %d\n", - file->f_dentry->d_name.len, - file->f_dentry->d_name.name, rc); - RETURN(rc); - } else { - dchild = dget(file->f_dentry); - llog_delete_log(cathandle, loghandle); - } - } else { - CDEBUG(D_INODE, "closing log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); + down(&inode->i_sem); + ofd = (struct ost_filterdata *) LUSTRE_FILTERDATA(inode); + + if (ofd && ofd->ofd_epoch >= io_epoch) { + if (ofd->ofd_epoch > io_epoch) + CERROR("client sent old epoch %d for obj ino %ld\n", + io_epoch, inode->i_ino); + up(&inode->i_sem); + RETURN(0); } - rc = filp_close(file, 0); - - llog_free_handle(loghandle); /* also removes loghandle from list */ - - if (dchild != NULL) { - int err = vfs_unlink(dparent->d_inode, dchild); - if (err) { - CERROR("error unlinking empty log %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); - if (!rc) - rc = err; - } - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); + if (ofd && ofd->ofd_epoch < io_epoch) { + ofd->ofd_epoch = io_epoch; + } else if (!ofd) { + OBD_ALLOC(ofd, sizeof(*ofd)); + if (!ofd) + GOTO(out, rc = -ENOMEM); + igrab(inode); + LUSTRE_FILTERDATA(inode) = (void *) ofd; + ofd->ofd_epoch = io_epoch; } - RETURN(rc); -} - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_open(struct obd_device *obd, - struct llog_cookie *logcookie) -{ - struct llog_logid *lgl = &logcookie->lgc_lgl; - struct llog_handle *loghandle; - struct dentry *dchild; - int rc; - ENTRY; - - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); + /* the decision to write a record is now made, unlock */ + up(&inode->i_sem); - dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid); - if (IS_ERR(dchild)) - GOTO(out_handle, rc = PTR_ERR(dchild)); + OBD_ALLOC(lsc, sizeof(*lsc)); + if (lsc == NULL) + RETURN(-ENOMEM); + lsc->lsc_hdr.lrh_len = lsc->lsc_tail.lrt_len = sizeof(*lsc); + lsc->lsc_hdr.lrh_type = OST_SZ_REC; + lsc->lsc_id = *id; + lsc->lsc_io_epoch = io_epoch; - if (dchild->d_inode == NULL) { - CERROR("logcookie references non-existent object %*s\n", - dchild->d_name.len, dchild->d_name.name); - GOTO(out_dentry, rc = -ENOENT); - } + CDEBUG(D_ERROR, "new epoch %lu for "DLID4"\n", + (unsigned long) io_epoch, OLID4(id)); - if (dchild->d_inode->i_generation != lgl->lgl_ogen) { - CERROR("logcookie for %*s had different generation %x != %x\n", - dchild->d_name.len, dchild->d_name.name, - dchild->d_inode->i_generation, lgl->lgl_ogen); - GOTO(out_dentry, rc = -ESTALE); - } + ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_ORIG_CTXT); + rc = llog_add(ctxt, &lsc->lsc_hdr, NULL, logcookie, 1, NULL,NULL,NULL); + OBD_FREE(lsc, sizeof(*lsc)); - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(obd->u.filter.fo_vfsmnt); - loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt, - O_RDWR); - if (IS_ERR(loghandle->lgh_file)) { - rc = PTR_ERR(loghandle->lgh_file); - CERROR("error opening logfile %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, rc); - GOTO(out_dentry, rc); + if (rc > 0) { + LASSERT(rc == sizeof(*logcookie)); + rc = 0; } - memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie)); - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - RETURN(loghandle); -out_dentry: - f_dput(dchild); -out_handle: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); +out: + RETURN(rc); } -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_create(struct obd_device *obd) -{ - struct filter_obd *filter = &obd->u.filter; - struct lustre_handle parent_lockh; - struct dentry *dparent, *dchild; - struct llog_handle *loghandle; - struct file *file; - int err, rc; - obd_id id; - ENTRY; - - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); - - retry: - id = filter_next_id(filter); - - dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh); - if (IS_ERR(dparent)) - GOTO(out_ctxt, rc = PTR_ERR(dparent)); - - dchild = filter_fid2dentry(obd, dparent, S_IFREG, id); - if (IS_ERR(dchild)) - GOTO(out_lock, rc = PTR_ERR(dchild)); - - if (dchild->d_inode != NULL) { - /* This would only happen if lastobjid was bad on disk */ - CERROR("Serious error: objid %*s already exists; is this " - "filesystem corrupt? I will try to work around it.\n", - dchild->d_name.len, dchild->d_name.name); - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); - goto retry; - } +struct obd_llogs * filter_grab_llog_for_group(struct obd_device *, + int, struct obd_export *); - rc = vfs_create(dparent->d_inode, dchild, S_IFREG); - if (rc) { - CERROR("log create failed rc = %d\n", rc); - GOTO(out_child, rc); - } - - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd); - if (rc) { - CERROR("can't write lastobjid but log created: rc %d\n",rc); - GOTO(out_destroy, rc); - } - - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(filter->fo_vfsmnt); - file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("error opening log file "LPX64": rc %d\n", id, rc); - GOTO(out_destroy, rc); +/* When this (destroy) operation is committed, return the cancel cookie */ +void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, + void *cb_data, int error) +{ + struct llog_cookie *cookie = cb_data; + struct obd_llogs *llogs = NULL; + struct llog_ctxt *ctxt; + + /* we have to find context for right group */ + llogs = filter_grab_llog_for_group(obd, cookie->lgc_lgl.lgl_ogr, NULL); + + if (llogs) { + ctxt = llog_get_context(llogs, cookie->lgc_subsys + 1); + if (ctxt) { + llog_cancel(ctxt, 1, cookie, 0, NULL); + } else + CERROR("no valid context for group "LPU64"\n", + cookie->lgc_lgl.lgl_ogr); + } else { + CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr); } - ldlm_lock_decref(&parent_lockh, LCK_PW); - - loghandle->lgh_file = file; - loghandle->lgh_cookie.lgc_lgl.lgl_oid = id; - loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation; - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - - RETURN(loghandle); -out_destroy: - err = vfs_unlink(dparent->d_inode, dchild); - if (err) - CERROR("error unlinking %*s on error: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); -out_child: - f_dput(dchild); -out_lock: - ldlm_lock_decref(&parent_lockh, LCK_PW); -out_ctxt: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); + OBD_FREE(cb_data, sizeof(struct llog_cookie)); } -/* This is called from filter_setup() and should be single threaded */ -struct llog_handle *filter_get_catalog(struct obd_device *obd) +/* Callback for processing the unlink log record received from MDS by + * llog_client_api. + */ +int filter_recov_log_unlink_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) { - struct filter_obd *filter = &obd->u.filter; - struct filter_server_data *fsd = filter->fo_fsd; - struct obd_run_ctxt saved; - struct llog_handle *cathandle = NULL; - int rc; + struct llog_ctxt *ctxt = llh->lgh_ctxt; + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct llog_cookie cookie; + struct llog_gen_rec *lgr; + struct llog_unlink_rec *lur; + struct obdo *oa; + obd_id oid; + int rc = 0; ENTRY; - - push_ctxt(&saved, &filter->fo_ctxt, NULL); - if (fsd->fsd_catalog_oid) { - struct llog_cookie catcookie; - - catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid); - catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen); - cathandle = filter_log_open(obd, &catcookie); - if (IS_ERR(cathandle)) { - CERROR("error opening catalog "LPX64":%x: rc %d\n", - catcookie.lgc_lgl.lgl_oid, - catcookie.lgc_lgl.lgl_ogen, - (int)PTR_ERR(cathandle)); - fsd->fsd_catalog_oid = 0; - fsd->fsd_catalog_ogen = 0; - } + + if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { + CERROR("log is not plain\n"); + RETURN(-EINVAL); } - - if (!fsd->fsd_catalog_oid) { - struct llog_logid *lgl; - - cathandle = filter_log_create(obd); - if (IS_ERR(cathandle)) { - CERROR("error creating new catalog: rc %d\n", - (int)PTR_ERR(cathandle)); - GOTO(out, cathandle); - } - lgl = &cathandle->lgh_cookie.lgc_lgl; - fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid); - fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen); - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, fsd); - if (rc) { - CERROR("error writing new catalog to disk: rc %d\n",rc); - GOTO(out_handle, rc); - } + if (rec->lrh_type != MDS_UNLINK_REC && + rec->lrh_type != LLOG_GEN_REC) { + CERROR("log record type error\n"); + RETURN(-EINVAL); } - - rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid); - if (rc) - GOTO(out_handle, rc); -out: - pop_ctxt(&saved, &filter->fo_ctxt, NULL); - RETURN(cathandle); - -out_handle: - filter_log_close(cathandle, cathandle); - cathandle = ERR_PTR(rc); - goto out; -} - -void filter_put_catalog(struct llog_handle *cathandle) -{ - struct llog_handle *loghandle, *n; - int rc; - ENTRY; - - list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list) - filter_log_close(cathandle, loghandle); - - rc = filp_close(cathandle->lgh_file, 0); - if (rc) - CERROR("error closing catalog: rc %d\n", rc); - - llog_free_handle(cathandle); - EXIT; -} - -int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, - int num_cookies, struct llog_cookie *logcookies, - int flags) -{ - struct obd_device *obd = class_conn2obd(conn); - struct obd_run_ctxt saved; - int rc; - ENTRY; - - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies, - logcookies); - pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - - RETURN(rc); -} - -int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid, - obd_id oid, obd_count ogen, - struct llog_cookie *logcookie) -{ - struct llog_create_rec *lcr; - int rc; - ENTRY; - - OBD_ALLOC(lcr, sizeof(*lcr)); - if (lcr == NULL) - RETURN(-ENOMEM); - lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr); - lcr->lcr_hdr.lth_type = OST_CREATE_REC; - lcr->lcr_fid.id = mds_fid->id; - lcr->lcr_fid.generation = mds_fid->generation; - lcr->lcr_fid.f_type = mds_fid->f_type; - lcr->lcr_oid = oid; - lcr->lcr_ogen = ogen; - - rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie); - OBD_FREE(lcr, sizeof(*lcr)); - - if (rc > 0) { - LASSERT(rc == sizeof(*logcookie)); - rc = 0; + + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT; + cookie.lgc_index = le32_to_cpu(rec->lrh_index); + + if (rec->lrh_type == LLOG_GEN_REC) { + lgr = (struct llog_gen_rec *)rec; + if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen)) + rc = 0; + else + rc = LLOG_PROC_BREAK; + CWARN("fetch generation log, send cookie\n"); + llog_cancel(ctxt, 1, &cookie, 0, NULL); + RETURN(rc); } - RETURN(rc); -} - -int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid, - obd_count ogen, struct llog_cookie *logcookie) -{ - struct llog_orphan_rec *lor; - int rc; - ENTRY; - OBD_ALLOC(lor, sizeof(*lor)); - if (lor == NULL) + lur = (struct llog_unlink_rec *)rec; + oa = obdo_alloc(); + if (oa == NULL) RETURN(-ENOMEM); - lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor); - lor->lor_hdr.lth_type = OST_ORPHAN_REC; - lor->lor_oid = oid; - lor->lor_ogen = ogen; + oa->o_valid |= OBD_MD_FLCOOKIE; + oa->o_id = lur->lur_oid; + oa->o_gr = lur->lur_ogen; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie)); + oid = oa->o_id; + + rc = obd_destroy(exp, oa, NULL, NULL); + obdo_free(oa); + if (rc == -ENOENT) { + CDEBUG(D_HA, "object already removed, send cookie\n"); + llog_cancel(ctxt, 1, &cookie, 0, NULL); + RETURN(0); + } - rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie); + if (rc == 0) + CDEBUG(D_HA, "object: "LPU64" in record is destroyed\n", oid); - if (rc > 0) { - LASSERT(rc == sizeof(*logcookie)); - rc = 0; - } RETURN(rc); }