X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter_log.c;h=e32bea29856a25a57a8d18b5f6de6b6737891926;hb=6e3ec5812ebd1b5ecf7cae584f429b013ffe7431;hp=d91e1e853a3ce782143036df41e224ce39bd19f9;hpb=a85b25f04f6ba5bb45c116927c9e5d0e4639dc97;p=fs%2Flustre-release.git diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index d91e1e8..e32bea2 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -1,380 +1,304 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * linux/fs/obdfilter/filter_log.c + * GPL HEADER START * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Author: Peter Braam - * Author: Andreas Dilger - * Author: Phil Schwan + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/obdfilter/filter_log.c + * + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan */ #define DEBUG_SUBSYSTEM S_FILTER +#ifndef AUTOCONF_INCLUDED #include +#endif #include #include -#include -#include -#include -#include - +#include +#include +#include +#include #include "filter_internal.h" -static struct llog_handle *filter_log_create(struct obd_device *obd); - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static int filter_log_close(struct llog_handle *cathandle, - struct llog_handle *loghandle) -{ - struct llog_object_hdr *llh = loghandle->lgh_hdr; - struct file *file = loghandle->lgh_file; - struct dentry *dparent = NULL, *dchild = NULL; - struct lustre_handle parent_lockh; - struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl; - int rc; - ENTRY; - - /* If we are going to delete this log, grab a ref before we close - * it so we don't have to immediately do another lookup. */ - if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){ - CDEBUG(D_INODE, "deleting log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); - dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG, - lgl->lgl_oid,LCK_PW,&parent_lockh); - if (IS_ERR(dparent)) { - rc = PTR_ERR(dparent); - CERROR("error locking parent, orphan log %*s: rc %d\n", - file->f_dentry->d_name.len, - file->f_dentry->d_name.name, rc); - RETURN(rc); - } else { - dchild = dget(file->f_dentry); - llog_delete_log(cathandle, loghandle); - } - } else { - CDEBUG(D_INODE, "closing log file "LPX64":%x\n", - lgl->lgl_oid, lgl->lgl_ogen); - } - - rc = filp_close(file, 0); - - llog_free_handle(loghandle); /* also removes loghandle from list */ - - if (dchild != NULL) { - int err = vfs_unlink(dparent->d_inode, dchild); - if (err) { - CERROR("error unlinking empty log %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); - if (!rc) - rc = err; - } - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); - } - RETURN(rc); -} - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_open(struct obd_device *obd, - struct llog_cookie *logcookie) +int filter_log_sz_change(struct llog_handle *cathandle, + struct ll_fid *mds_fid, + __u32 ioepoch, + struct llog_cookie *logcookie, + struct inode *inode) { - struct llog_logid *lgl = &logcookie->lgc_lgl; - struct llog_handle *loghandle; - struct dentry *dchild; + struct llog_size_change_rec *lsc; int rc; + struct ost_filterdata *ofd; ENTRY; - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); - - dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid); - if (IS_ERR(dchild)) - GOTO(out_handle, rc = PTR_ERR(dchild)); - - if (dchild->d_inode == NULL) { - CERROR("logcookie references non-existent object %*s\n", - dchild->d_name.len, dchild->d_name.name); - GOTO(out_dentry, rc = -ENOENT); - } + LOCK_INODE_MUTEX(inode); + ofd = INODE_PRIVATE_DATA(inode); - if (dchild->d_inode->i_generation != lgl->lgl_ogen) { - CERROR("logcookie for %*s had different generation %x != %x\n", - dchild->d_name.len, dchild->d_name.name, - dchild->d_inode->i_generation, lgl->lgl_ogen); - GOTO(out_dentry, rc = -ESTALE); + if (ofd && ofd->ofd_epoch >= ioepoch) { + if (ofd->ofd_epoch > ioepoch) + CERROR("client sent old epoch %d for obj ino %ld\n", + ioepoch, inode->i_ino); + UNLOCK_INODE_MUTEX(inode); + RETURN(0); } - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(obd->u.filter.fo_vfsmnt); - loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt, - O_RDWR); - if (IS_ERR(loghandle->lgh_file)) { - rc = PTR_ERR(loghandle->lgh_file); - CERROR("error opening logfile %*s: rc %d\n", - dchild->d_name.len, dchild->d_name.name, rc); - GOTO(out_dentry, rc); + if (ofd && ofd->ofd_epoch < ioepoch) { + ofd->ofd_epoch = ioepoch; + } else if (!ofd) { + OBD_ALLOC(ofd, sizeof(*ofd)); + if (!ofd) + GOTO(out, rc = -ENOMEM); + igrab(inode); + INODE_PRIVATE_DATA(inode) = ofd; + ofd->ofd_epoch = ioepoch; } - memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie)); - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - RETURN(loghandle); - -out_dentry: - f_dput(dchild); -out_handle: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); -} - -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ -static struct llog_handle *filter_log_create(struct obd_device *obd) -{ - struct filter_obd *filter = &obd->u.filter; - struct lustre_handle parent_lockh; - struct dentry *dparent, *dchild; - struct llog_handle *loghandle; - struct file *file; - int err, rc; - obd_id id; - ENTRY; + /* the decision to write a record is now made, unlock */ + UNLOCK_INODE_MUTEX(inode); - loghandle = llog_alloc_handle(); - if (!loghandle) - RETURN(ERR_PTR(-ENOMEM)); - - retry: - id = filter_next_id(filter); - - dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh); - if (IS_ERR(dparent)) - GOTO(out_ctxt, rc = PTR_ERR(dparent)); - - dchild = filter_fid2dentry(obd, dparent, S_IFREG, id); - if (IS_ERR(dchild)) - GOTO(out_lock, rc = PTR_ERR(dchild)); - - if (dchild->d_inode != NULL) { - /* This would only happen if lastobjid was bad on disk */ - CERROR("Serious error: objid %*s already exists; is this " - "filesystem corrupt? I will try to work around it.\n", - dchild->d_name.len, dchild->d_name.name); - f_dput(dchild); - ldlm_lock_decref(&parent_lockh, LCK_PW); - goto retry; - } + OBD_ALLOC(lsc, sizeof(*lsc)); + if (lsc == NULL) + RETURN(-ENOMEM); + lsc->lsc_hdr.lrh_len = lsc->lsc_tail.lrt_len = sizeof(*lsc); + lsc->lsc_hdr.lrh_type = OST_SZ_REC; + lsc->lsc_fid = *mds_fid; + lsc->lsc_ioepoch = ioepoch; - rc = vfs_create(dparent->d_inode, dchild, S_IFREG); - if (rc) { - CERROR("log create failed rc = %d\n", rc); - GOTO(out_child, rc); - } + rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie, NULL); + OBD_FREE(lsc, sizeof(*lsc)); - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd); - if (rc) { - CERROR("can't write lastobjid but log created: rc %d\n",rc); - GOTO(out_destroy, rc); + if (rc > 0) { + LASSERT(rc == sizeof(*logcookie)); + rc = 0; } - /* dentry_open does a dput(dchild) and mntput(mnt) on error */ - mntget(filter->fo_vfsmnt); - file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("error opening log file "LPX64": rc %d\n", id, rc); - GOTO(out_destroy, rc); - } - ldlm_lock_decref(&parent_lockh, LCK_PW); - - loghandle->lgh_file = file; - loghandle->lgh_cookie.lgc_lgl.lgl_oid = id; - loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation; - loghandle->lgh_log_create = filter_log_create; - loghandle->lgh_log_open = filter_log_open; - loghandle->lgh_log_close = filter_log_close; - loghandle->lgh_obd = obd; - - RETURN(loghandle); - -out_destroy: - err = vfs_unlink(dparent->d_inode, dchild); - if (err) - CERROR("error unlinking %*s on error: rc %d\n", - dchild->d_name.len, dchild->d_name.name, err); -out_child: - f_dput(dchild); -out_lock: - ldlm_lock_decref(&parent_lockh, LCK_PW); -out_ctxt: - llog_free_handle(loghandle); - RETURN(ERR_PTR(rc)); + out: + RETURN(rc); } -/* This is called from filter_setup() and should be single threaded */ -struct llog_handle *filter_get_catalog(struct obd_device *obd) +/* When this (destroy) operation is committed, return the cancel cookie */ +void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, + void *cb_data, int error) { - struct filter_obd *filter = &obd->u.filter; - struct filter_server_data *fsd = filter->fo_fsd; - struct obd_run_ctxt saved; - struct llog_handle *cathandle = NULL; + struct llog_cookie *cookie = cb_data; + struct obd_llog_group *olg; + struct llog_ctxt *ctxt; int rc; - ENTRY; - push_ctxt(&saved, &filter->fo_ctxt, NULL); - if (fsd->fsd_catalog_oid) { - struct llog_cookie catcookie; - - catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid); - catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen); - cathandle = filter_log_open(obd, &catcookie); - if (IS_ERR(cathandle)) { - CERROR("error opening catalog "LPX64":%x: rc %d\n", - catcookie.lgc_lgl.lgl_oid, - catcookie.lgc_lgl.lgl_ogen, - (int)PTR_ERR(cathandle)); - fsd->fsd_catalog_oid = 0; - fsd->fsd_catalog_ogen = 0; - } + /* we have to find context for right group */ + if (error != 0 || obd->obd_stopping) { + CDEBUG(D_INODE, "not cancel logcookie err %d stopping %d \n", + error, obd->obd_stopping); + GOTO (out, rc = 0); } - if (!fsd->fsd_catalog_oid) { - struct llog_logid *lgl; + olg = filter_find_olg(obd, cookie->lgc_lgl.lgl_ogr); + if (!olg) { + CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr); + GOTO(out, rc = 0); + } - cathandle = filter_log_create(obd); - if (IS_ERR(cathandle)) { - CERROR("error creating new catalog: rc %d\n", - (int)PTR_ERR(cathandle)); - GOTO(out, cathandle); - } - lgl = &cathandle->lgh_cookie.lgc_lgl; - fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid); - fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen); - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, fsd); - if (rc) { - CERROR("error writing new catalog to disk: rc %d\n",rc); - GOTO(out_handle, rc); - } + ctxt = llog_group_get_ctxt(olg, cookie->lgc_subsys + 1); + if (!ctxt) { + CERROR("no valid context for group "LPU64"\n", + cookie->lgc_lgl.lgl_ogr); + GOTO(out, rc = 0); } - rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid); + OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30); + + rc = llog_cancel(ctxt, NULL, 1, cookie, 0); if (rc) - GOTO(out_handle, rc); + CERROR("error cancelling log cookies: rc = %d\n", rc); + llog_ctxt_put(ctxt); out: - pop_ctxt(&saved, &filter->fo_ctxt, NULL); - RETURN(cathandle); - -out_handle: - filter_log_close(cathandle, cathandle); - cathandle = ERR_PTR(rc); - goto out; + OBD_FREE(cookie, sizeof(*cookie)); } -void filter_put_catalog(struct llog_handle *cathandle) +/* Callback for processing the unlink log record received from MDS by + * llog_client_api. */ +static int filter_recov_log_unlink_cb(struct llog_ctxt *ctxt, + struct llog_rec_hdr *rec, + struct llog_cookie *cookie) { - struct llog_handle *loghandle, *n; - int rc; + struct obd_export *exp = ctxt->loc_obd->obd_self_export; + struct llog_unlink_rec *lur; + struct obdo *oa; + obd_id oid; + obd_count count; + int rc = 0; ENTRY; - list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list) - filter_log_close(cathandle, loghandle); - - rc = filp_close(cathandle->lgh_file, 0); - if (rc) - CERROR("error closing catalog: rc %d\n", rc); + lur = (struct llog_unlink_rec *)rec; + OBDO_ALLOC(oa); + if (oa == NULL) + RETURN(-ENOMEM); + oa->o_valid |= OBD_MD_FLCOOKIE; + oa->o_id = lur->lur_oid; + oa->o_gr = lur->lur_ogr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oa->o_lcookie = *cookie; + oid = oa->o_id; + /* objid gap may require to destroy several objects in row */ + count = lur->lur_count + 1; + + while (count > 0) { + rc = filter_destroy(exp, oa, NULL, NULL, NULL, NULL); + if (rc == 0) + CDEBUG(D_RPCTRACE, "object "LPU64" is destroyed\n", + oid); + else if (rc != -ENOENT) + CEMERG("error destroying object "LPU64": %d\n", + oid, rc); + else + rc = 0; + count--; + oid++; + } + OBDO_FREE(oa); - llog_free_handle(cathandle); - EXIT; + RETURN(rc); } -int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, - int num_cookies, struct llog_cookie *logcookies, - int flags) +/* Callback for processing the setattr log record received from MDS by + * llog_client_api. */ +static int filter_recov_log_setattr_cb(struct llog_ctxt *ctxt, + struct llog_rec_hdr *rec, + struct llog_cookie *cookie) { - struct obd_device *obd = class_conn2obd(conn); - struct obd_run_ctxt saved; - int rc; + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct obd_info oinfo = { { { 0 } } }; + obd_id oid; + int rc = 0; ENTRY; - push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); - rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies, - logcookies); - pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + OBDO_ALLOC(oinfo.oi_oa); + if (oinfo.oi_oa == NULL) + RETURN(-ENOMEM); - RETURN(rc); -} + if (rec->lrh_type == MDS_SETATTR_REC) { + struct llog_setattr_rec *lsr = (struct llog_setattr_rec *)rec; -int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid, - obd_id oid, obd_count ogen, - struct llog_cookie *logcookie) -{ - struct llog_create_rec *lcr; - int rc; - ENTRY; + oinfo.oi_oa->o_id = lsr->lsr_oid; + oinfo.oi_oa->o_gr = lsr->lsr_ogr; + oinfo.oi_oa->o_uid = lsr->lsr_uid; + oinfo.oi_oa->o_gid = lsr->lsr_gid; + } else { + struct llog_setattr64_rec *lsr = (struct llog_setattr64_rec *)rec; - OBD_ALLOC(lcr, sizeof(*lcr)); - if (lcr == NULL) - RETURN(-ENOMEM); - lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr); - lcr->lcr_hdr.lth_type = OST_CREATE_REC; - lcr->lcr_fid.id = mds_fid->id; - lcr->lcr_fid.generation = mds_fid->generation; - lcr->lcr_fid.f_type = mds_fid->f_type; - lcr->lcr_oid = oid; - lcr->lcr_ogen = ogen; + oinfo.oi_oa->o_id = lsr->lsr_oid; + oinfo.oi_oa->o_gr = lsr->lsr_ogr; + oinfo.oi_oa->o_uid = lsr->lsr_uid; + oinfo.oi_oa->o_gid = lsr->lsr_gid; + } - rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie); - OBD_FREE(lcr, sizeof(*lcr)); + oinfo.oi_oa->o_valid |= (OBD_MD_FLID | OBD_MD_FLUID | OBD_MD_FLGID | + OBD_MD_FLCOOKIE); + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oinfo.oi_oa->o_lcookie = *cookie; + oid = oinfo.oi_oa->o_id; - if (rc > 0) { - LASSERT(rc == sizeof(*logcookie)); - rc = 0; + rc = filter_setattr(exp, &oinfo, NULL); + OBDO_FREE(oinfo.oi_oa); + + if (rc == -ENOENT) { + CDEBUG(D_RPCTRACE, "object already removed, send cookie\n"); + llog_cancel(ctxt, NULL, 1, cookie, 0); + RETURN(0); } + + if (rc == 0) + CDEBUG(D_RPCTRACE, "object "LPU64" is chown/chgrp\n", oid); + RETURN(rc); } -int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid, - obd_count ogen, struct llog_cookie *logcookie) +int filter_recov_log_mds_ost_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) { - struct llog_orphan_rec *lor; - int rc; + struct llog_ctxt *ctxt = llh->lgh_ctxt; + struct llog_cookie cookie; + int rc = 0; ENTRY; - OBD_ALLOC(lor, sizeof(*lor)); - if (lor == NULL) - RETURN(-ENOMEM); - lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor); - lor->lor_hdr.lth_type = OST_ORPHAN_REC; - lor->lor_oid = oid; - lor->lor_ogen = ogen; + if (ctxt->loc_obd->obd_stopping) + RETURN(LLOG_PROC_BREAK); - rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie); + if (rec == NULL) { + cfs_spin_lock_bh(&ctxt->loc_obd->obd_processing_task_lock); + ctxt->loc_obd->u.filter.fo_mds_ost_sync = 0; + cfs_spin_unlock_bh(&ctxt->loc_obd->obd_processing_task_lock); + RETURN(0); + } - if (rc > 0) { - LASSERT(rc == sizeof(*logcookie)); - rc = 0; + if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) { + CERROR("log is not plain\n"); + RETURN(-EINVAL); } + + OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT, 30); + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; + cookie.lgc_index = rec->lrh_index; + + switch (rec->lrh_type) { + case MDS_UNLINK_REC: + rc = filter_recov_log_unlink_cb(ctxt, rec, &cookie); + break; + case MDS_SETATTR_REC: + case MDS_SETATTR64_REC: + rc = filter_recov_log_setattr_cb(ctxt, rec, &cookie); + break; + case LLOG_GEN_REC: { + struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec; + if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen)) + rc = 0; + else + rc = LLOG_PROC_BREAK; + CDEBUG(D_HA, "fetch generation log, send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(rc); + } + break; + default: + CERROR("log record type %08x unknown\n", rec->lrh_type); + RETURN(-EINVAL); + break; + } + RETURN(rc); }