From 342797139fe89f9efd184227cfbd7d654533685c Mon Sep 17 00:00:00 2001 From: adilger Date: Sat, 31 Jan 2004 00:33:58 +0000 Subject: [PATCH] Land b_orphan on HEAD (20040130_1601) --- lnet/include/config.h.in | 3 - lustre/ChangeLog | 3 +- lustre/configure.in | 6 -- lustre/include/linux/lustre_commit_confd.h | 3 +- lustre/include/linux/lustre_fsfilt.h | 1 + lustre/include/linux/lustre_idl.h | 45 +++++---- lustre/include/linux/lustre_log.h | 107 +++++++++++---------- lustre/include/linux/lustre_mds.h | 5 + lustre/ldlm/ldlm_lib.c | 9 +- lustre/ldlm/ldlm_lockd.c | 7 +- lustre/lov/lov_log.c | 27 +----- lustre/lvfs/fsfilt_ext3.c | 8 ++ lustre/mds/handler.c | 9 +- lustre/mds/mds_log.c | 6 +- lustre/mds/mds_lov.c | 60 +++++------- lustre/mds/mds_open.c | 3 +- lustre/mds/mds_reint.c | 7 -- lustre/mds/mds_unlink_open.c | 5 +- lustre/obdclass/llog.c | 58 +++++++---- lustre/obdclass/llog_cat.c | 149 ++++++++++++++++++++--------- lustre/obdclass/llog_ioctl.c | 134 +++++++++++++------------- lustre/obdclass/llog_lvfs.c | 56 +++++------ lustre/obdclass/llog_obd.c | 88 ++++++++--------- lustre/obdclass/llog_test.c | 60 ++++++------ lustre/obdclass/obd_config.c | 4 +- lustre/obdfilter/filter.c | 24 ++--- lustre/obdfilter/filter_log.c | 25 +++-- lustre/ost/ost_handler.c | 4 - lustre/portals/include/config.h.in | 3 - lustre/ptlrpc/llog_client.c | 1 + lustre/ptlrpc/llog_net.c | 30 +++++- lustre/ptlrpc/llog_server.c | 97 +++++++++++-------- lustre/ptlrpc/recov_thread.c | 133 +++++++++++++------------ lustre/tests/replay-single.sh | 38 +++++++- lustre/utils/wirecheck.c | 117 ++++++++++++++++++---- lustre/utils/wiretest.c | 33 +++++-- 36 files changed, 810 insertions(+), 558 deletions(-) diff --git a/lnet/include/config.h.in b/lnet/include/config.h.in index f9605ab1..f295154 100644 --- a/lnet/include/config.h.in +++ b/lnet/include/config.h.in @@ -1,8 +1,5 @@ /* portals/include/config.h.in. Generated from configure.in by autoheader. */ -/* Compile with orphan support */ -#undef ENABLE_ORPHANS - /* Use the Pinger */ #undef ENABLE_PINGER diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 6cf3e48..5588b2e 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1,6 +1,7 @@ tbd Cluster File Systems, Inc. * version 1.2.0 * bug fixes + - orphan handling to avoid losing space on client/server crashes 2004-01-27 Cluster File Systems, Inc. * version 1.0.3 @@ -89,7 +90,7 @@ tbd Cluster File Systems, Inc. - ENOMEM detection and retry on socknal sends (2230) - use GFP_NOFS throughout Lustre, to combat ENOMEM (2230) - move osc_rpcd into ptlrpc, for use in MDC and others (2329) - - protect MDS inode fsdata with stronger locking (2313) + - protect MDS inode fsdata with stronger locking; fixes assertion (2313) - better error messages when a client is rejected during recovery (1505) - avoid cancelling locks which were never granted, after failure (2330) - fix i_sem/journal inversion in mds_client_add (2333) diff --git a/lustre/configure.in b/lustre/configure.in index 9eda4b6..66a1840 100644 --- a/lustre/configure.in +++ b/lustre/configure.in @@ -33,12 +33,6 @@ if test x$enable_pinger != xno ; then AC_DEFINE(ENABLE_PINGER, 1, Use the Pinger) fi -# very experimental orphan support -AC_ARG_ENABLE(orphans, [ --enable-orphans very experimental orphan recovery support]) -if test x$enable_orphans = xyes ; then - AC_DEFINE(ENABLE_ORPHANS, 1, Compile with orphan support) -fi - AC_ARG_WITH(obd-buffer-size, [ --with-obd-buffer-size=[size] set lctl ioctl maximum bytes (default=8192)],OBD_BUFFER_SIZE=$with_obd_buffer_size,OBD_BUFFER_SIZE=8192) AC_DEFINE_UNQUOTED(OBD_MAX_IOCTL_BUFFER, $OBD_BUFFER_SIZE, [IOCTL Buffer Size]) diff --git a/lustre/include/linux/lustre_commit_confd.h b/lustre/include/linux/lustre_commit_confd.h index a749911..6183596 100644 --- a/lustre/include/linux/lustre_commit_confd.h +++ b/lustre/include/linux/lustre_commit_confd.h @@ -28,10 +28,9 @@ struct llog_canceld_ctxt { struct list_head llcd_list; /* free or pending struct list */ - struct obd_import *llcd_import; + struct llog_ctxt *llcd_ctxt; struct llog_commit_master *llcd_lcm; int llcd_tries; /* number of tries to send */ - struct llog_ctxt_gen llcd_gen; int llcd_cookiebytes; struct llog_cookie llcd_cookies[0]; }; diff --git a/lustre/include/linux/lustre_fsfilt.h b/lustre/include/linux/lustre_fsfilt.h index 84c44ac..0d62e90 100644 --- a/lustre/include/linux/lustre_fsfilt.h +++ b/lustre/include/linux/lustre_fsfilt.h @@ -91,6 +91,7 @@ extern void fsfilt_put_ops(struct fsfilt_operations *fs_ops); #define FSFILT_OP_LINK 9 #define FSFILT_OP_CREATE_LOG 10 #define FSFILT_OP_UNLINK_LOG 11 +#define FSFILT_OP_CANCEL_UNLINK_LOG 12 static inline void *fsfilt_start(struct obd_device *obd, struct inode *inode, int op, struct obd_trans_info *oti) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 4b6ad12..da9bd52 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -387,7 +387,7 @@ static inline struct llog_cookie *obdo_logcookie(struct obdo *oa) return (struct llog_cookie *)(oa->o_inline + sizeof(struct lustre_handle)); } -/* don't forget obdo_fid which is way down at the bottom so it can +/* don't forget obdo_fid which is way down at the bottom so it can * come after the definition of llog_cookie */ struct obd_statfs { @@ -491,12 +491,6 @@ typedef enum { #define DISP_OPEN_OPEN (1 << 5) #define DISP_ENQ_COMPLETE (1<<6) - -struct ll_uctxt { - __u32 gid1; - __u32 gid2; -}; - struct ll_fid { __u64 id; __u32 generation; @@ -824,17 +818,23 @@ extern void lustre_swab_ptlbd_rsp (struct ptlbd_rsp *r); /* * Opcodes for management/monitoring node. */ -#define MGMT_CONNECT 250 -#define MGMT_DISCONNECT 251 -#define MGMT_EXCEPTION 252 /* node died, etc. */ +typedef enum { + MGMT_CONNECT = 250, + MGMT_DISCONNECT, + MGMT_EXCEPTION, /* node died, etc. */ + MGMT_LAST_OPC +} mgmt_cmd_t; +#define MGMT_FIRST_OPC MGMT_CONNECT /* * Opcodes for multiple servers. */ -#define OBD_PING 400 -#define OBD_LOG_CANCEL 401 -#define OBD_LAST_OPC (OBD_LOG_CANCEL + 1) +typedef enum { + OBD_PING = 400, + OBD_LOG_CANCEL, + OBD_LAST_OPC +} obd_cmd_t; #define OBD_FIRST_OPC OBD_PING /* catalog of log objects */ @@ -855,6 +855,7 @@ typedef enum { MDS_UNLINK_REC = 0x10610000 | (MDS_REINT << 8) | REINT_UNLINK, OBD_CFG_REC = 0x10620000, PTL_CFG_REC = 0x10630000, + LLOG_GEN_REC = 0x10640000, LLOG_HDR_MAGIC = 0x10645539, LLOG_LOGID_MAGIC = 0x1064553a, } llog_op_type; @@ -915,6 +916,16 @@ struct llog_size_change_rec { struct llog_rec_tail lsc_tail; } __attribute__((packed)); +struct llog_gen { + __u64 mnt_cnt; + __u64 conn_cnt; +} __attribute__((packed)); + +struct llog_gen_rec { + struct llog_rec_hdr lgr_hdr; + struct llog_gen lgr_gen; + struct llog_rec_tail lgr_tail; +}; /* On-disk header structure of each log object, stored in little endian order */ #define LLOG_CHUNK_SIZE 4096 #define LLOG_HEADER_SIZE (96) @@ -935,6 +946,7 @@ struct llog_log_hdr { __u32 llh_size; __u32 llh_flags; __u32 llh_cat_idx; + /* for a catlog the first plain slot is next to it */ struct obd_uuid llh_tgtuuid; __u32 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23]; __u32 llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)]; @@ -970,13 +982,8 @@ struct llogd_body { __u64 lgd_cur_offset; } __attribute__((packed)); -struct llog_ctxt_gen { - __u64 mnt_cnt; - __u64 conn_cnt; -}; - struct llogd_conn_body { - struct llog_ctxt_gen lgdc_gen; + struct llog_gen lgdc_gen; struct llog_logid lgdc_logid; __u32 lgdc_ctxt_idx; } __attribute__((packed)); diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index c3ff249..2b62378 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -42,10 +42,10 @@ snprintf(logname, sizeof(logname), "LOGS/%s", name) struct plain_handle_data { - struct list_head phd_entry; - struct llog_handle *phd_cat_handle; - struct llog_cookie phd_cookie; /* cookie of this log in its cat */ - int phd_last_idx; + struct list_head phd_entry; + struct llog_handle *phd_cat_handle; + struct llog_cookie phd_cookie; /* cookie of this log in its cat */ + int phd_last_idx; }; struct cat_handle_data { @@ -71,9 +71,10 @@ struct llog_handle { /* llog.c - general API */ typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *); -int llog_init_handle(struct llog_handle *handle, int flags, +int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid); -int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data); +int llog_process(struct llog_handle *loghandle, llog_cb_t cb, + void *data, void *catdata); extern struct llog_handle *llog_alloc_handle(void); extern void llog_free_handle(struct llog_handle *handle); extern int llog_close(struct llog_handle *cathandle); @@ -84,26 +85,34 @@ struct llog_process_data { void *lpd_data; llog_cb_t lpd_cb; }; + +struct llog_process_cat_data { + int first_idx; + int last_idx; + /* to process catlog across zero record */ +}; + int llog_cat_put(struct llog_handle *cathandle); int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct llog_cookie *reccookie, void *buf); int llog_cat_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); +int llog_cat_set_first_idx(struct llog_handle *cathandle, int index); /* llog_obd.c */ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, - int count, struct llog_logid *logid, struct llog_operations *op); + int count, struct llog_logid *logid,struct llog_operations *op); int llog_cleanup(struct llog_ctxt *); int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_add(struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies); +int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + struct lov_stripe_md *lsm, struct llog_cookie *logcookies, + int numcookies); int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm, - int count, struct llog_cookie *cookies, int flags); + int count, struct llog_cookie *cookies, int flags); -int llog_obd_origin_setup(struct obd_device *obd, int index, - struct obd_device *disk_obd, int count, +int llog_obd_origin_setup(struct obd_device *obd, int index, + struct obd_device *disk_obd, int count, struct llog_logid *logid); int llog_obd_origin_cleanup(struct llog_ctxt *ctxt); int llog_obd_origin_add(struct llog_ctxt *ctxt, @@ -118,14 +127,14 @@ int obd_llog_finish(struct obd_device *obd, int count); /* llog_ioctl.c */ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data); -int llog_catlog_list(struct obd_device *obd, int count, +int llog_catlog_list(struct obd_device *obd, int count, struct obd_ioctl_data *data); /* llog_net.c */ int llog_initiator_connect(struct llog_ctxt *ctxt); int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp); int llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); + struct llog_logid *logid, struct llog_gen *gen); int llog_handle_connect(struct ptlrpc_request *req); /* recov_thread.c */ @@ -133,40 +142,34 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); +int llog_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, - struct llog_rec_hdr *rec, - struct llog_cookie *logcookies, - int numcookies, - void *, - int idx); + struct llog_rec_hdr *rec, + struct llog_cookie *logcookies, int numcookies, + void *, int idx); int (*lop_destroy)(struct llog_handle *handle); - int (*lop_next_block)(struct llog_handle *h, - int *curr_idx, - int next_idx, - __u64 *offset, - void *buf, - int len); + int (*lop_next_block)(struct llog_handle *h, int *curr_idx, + int next_idx, __u64 *offset, void *buf, int len); int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **, struct llog_logid *logid, char *name); int (*lop_close)(struct llog_handle *handle); int (*lop_read_header)(struct llog_handle *handle); - int (*lop_setup)(struct obd_device *obd, int ctxt_idx, - struct obd_device *disk_obd, int count, + int (*lop_setup)(struct obd_device *obd, int ctxt_idx, + struct obd_device *disk_obd, int count, struct llog_logid *logid); int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp); int (*lop_cleanup)(struct llog_ctxt *ctxt); - int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, - struct lov_stripe_md *lsm, + int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies); int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int (*lop_connect)(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); + struct llog_logid *logid, struct llog_gen *gen); /* XXX add 2 more: commit callbacks and llog recovery functions */ }; @@ -178,10 +181,10 @@ extern struct llog_operations llog_lvfs_ops; struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ - struct llog_ctxt_gen loc_gen; + struct llog_gen loc_gen; struct obd_device *loc_obd; /* points back to the containing obd*/ struct obd_export *loc_exp; - struct obd_import *loc_imp; /* to use in RPC's: can be backward + struct obd_import *loc_imp; /* to use in RPC's: can be backward pointing import */ struct llog_operations *loc_logops; struct llog_handle *loc_handle; @@ -190,20 +193,19 @@ struct llog_ctxt { void *llog_proc_cb; }; -static inline void log_gen_init(struct llog_ctxt *ctxt) +static inline void llog_gen_init(struct llog_ctxt *ctxt) { struct obd_device *obd = ctxt->loc_exp->exp_obd; if (!strcmp(obd->obd_type->typ_name, "mds")) ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count; - else if (!strstr(obd->obd_type->typ_name, "filter")) { - ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; - } + else if (!strstr(obd->obd_type->typ_name, "filter")) + ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; else - ctxt->loc_gen.mnt_cnt = 0; + ctxt->loc_gen.mnt_cnt = 0; } -static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b) +static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b) { if (a.mnt_cnt < b.mnt_cnt) return 1; @@ -212,15 +214,19 @@ static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b) return(a.conn_cnt < b.conn_cnt ? 1 : 0); } +#define LLOG_GEN_INC(gen) ((gen).conn_cnt) ++ +#define LLOG_PROC_BREAK 0x0001 static inline int llog_obd2ops(struct llog_ctxt *ctxt, struct llog_operations **lop) { if (ctxt == NULL) return -ENOTCONN; + *lop = ctxt->loc_logops; if (*lop == NULL) return -EOPNOTSUPP; + return 0; } @@ -229,6 +235,7 @@ static inline int llog_handle2ops(struct llog_handle *loghandle, { if (loghandle == NULL) return -EINVAL; + return llog_obd2ops(loghandle->lgh_ctxt, lop); } @@ -242,8 +249,8 @@ static inline struct llog_ctxt *llog_get_context(struct obd_device *obd, { if (index < 0 || index >= LLOG_MAX_CTXTS) return NULL; - else - return obd->obd_llog_ctxt[index]; + + return obd->obd_llog_ctxt[index]; } static inline int llog_write_rec(struct llog_handle *handle, @@ -254,7 +261,7 @@ static inline int llog_write_rec(struct llog_handle *handle, struct llog_operations *lop; int rc, buflen; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -277,7 +284,7 @@ static inline int llog_read_header(struct llog_handle *handle) struct llog_operations *lop; int rc; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -293,7 +300,7 @@ static inline int llog_destroy(struct llog_handle *handle) struct llog_operations *lop; int rc; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -322,7 +329,7 @@ static inline int llog_cancel(struct obd_export *exp, rc = lop->lop_cancel(exp, lsm, count, cookies, flags); RETURN(rc); } -#endif +#endif static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx, int next_idx, __u64 *cur_offset, void *buf, @@ -343,8 +350,7 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx, RETURN(rc); } -static inline int llog_create(struct llog_ctxt *ctxt, - struct llog_handle **res, +static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res, struct llog_logid *logid, char *name) { struct llog_operations *lop; @@ -362,8 +368,7 @@ static inline int llog_create(struct llog_ctxt *ctxt, } static inline int llog_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, - struct llog_ctxt_gen *gen) + struct llog_logid *logid, struct llog_gen *gen) { struct llog_operations *lop; int rc; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index d303272..a1f333a 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -56,6 +56,11 @@ struct lustre_md { struct lov_stripe_md *lsm; }; +struct ll_uctxt { + __u32 gid1; + __u32 gid2; +}; + struct mdc_op_data { struct ll_fid fid1; struct ll_fid fid2; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 1ca54d3..f4c0fc5 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -642,7 +642,7 @@ void target_abort_recovery(void *data) class_disconnect_exports(obd, 0); - /* when recovery was abort, cleanup orphans for mds */ + /* when recovery was aborted, cleanup orphans on mds and ost */ if (OBT(obd) && OBP(obd, postrecov)) { rc = OBP(obd, postrecov)(obd); if (rc >= 0) @@ -930,12 +930,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) obd->obd_name); obd->obd_recovering = 0; - /* when recovering finished, cleanup orphans for mds */ + /* when recovery finished, cleanup orphans on mds and ost */ if (OBT(obd) && OBP(obd, postrecov)) { rc2 = OBP(obd, postrecov)(obd); if (rc2 >= 0) - CWARN("%s: all clients recovered, %d MDS orphans " - "deleted\n", obd->obd_name, rc2); + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc2); else CERROR("postrecov failed %d\n", rc2); } @@ -1133,4 +1133,3 @@ void *ldlm_put_lock_into_req(struct ptlrpc_request *req, LBUG(); return NULL; } - diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 95cb950..042a383 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -855,41 +855,36 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) LASSERT(req->rq_export != NULL); LASSERT(req->rq_export->exp_obd != NULL); -#ifdef ENABLE_ORPHANS /* FIXME - how to send reply */ if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) { int rc = llog_origin_handle_cancel(req); ldlm_callback_reply(req, rc); RETURN(0); } - if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) { int rc = llog_origin_handle_create(req); req->rq_status = rc; ptlrpc_reply(req); RETURN(0); } - if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) { int rc = llog_origin_handle_next_block(req); req->rq_status = rc; ptlrpc_reply(req); RETURN(0); } - if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) { int rc = llog_origin_handle_read_header(req); req->rq_status = rc; ptlrpc_reply(req); RETURN(0); } - if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) { int rc = llog_origin_handle_close(req); ldlm_callback_reply(req, rc); RETURN(0); } -#endif + ns = req->rq_export->exp_obd->obd_namespace; LASSERT(ns != NULL); diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index 0b9f6f3..59dc29e 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -51,26 +51,6 @@ #include "lov_internal.h" -#if 0 -static int lov_logop_cleanup(struct llog_ctxt *ctxt) -{ - struct lov_obd *lov = &ctxt->loc_obd->u.lov; - int i, rc = 0; - - ENTRY; - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd; - struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); - rc = llog_cleanup(cctxt); - if (rc) { - CERROR("error lov_llog_open %d\n", i); - break; - } - } - RETURN(rc); -} -#endif - /* Add log records for each OSC that this object is striped over, and return * cookies for each one. We _would_ have nice abstraction here, except that * we need to keep cookies in stripe order, even if some are NULL, so that @@ -101,7 +81,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, lur->lur_oid = loi->loi_id; lur->lur_ogen = loi->loi_gr; - rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, numcookies - rc); + rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, + numcookies - rc); } OBD_FREE(lur, sizeof(*lur)); @@ -110,8 +91,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, } static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, - struct llog_ctxt_gen *gen) + struct llog_logid *logid, + struct llog_gen *gen) { struct obd_device *obd = ctxt->loc_obd; struct lov_obd *lov = &obd->u.lov; diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index 5de8a9b..a45560a 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -74,6 +74,7 @@ static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private) { /* For updates to the last recieved file */ int nblocks = EXT3_DATA_TRANS_BLOCKS; + int blocksize, block_count = 0; void *handle; if (current->journal_info) { @@ -119,6 +120,13 @@ static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private) /* Setattr on inode */ nblocks += 1; break; + case FSFILT_OP_CANCEL_UNLINK_LOG: + blocksize = 1 << inode->i_blkbits; + block_count = (blocksize - 1) + LLOG_CHUNK_SIZE; + block_count = (block_count + blocksize - 1) >> inode->i_blkbits; + block_count = block_count * EXT3_DATA_TRANS_BLOCKS + 2; + nblocks = 2 * 2 * block_count; + break; default: CERROR("unknown transaction start op %d\n", op); LBUG(); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 8434ce0..b8ade94 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -993,7 +993,7 @@ static char *reint_names[] = { int mds_handle(struct ptlrpc_request *req) { - int should_process; + int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; int rc = 0; struct mds_obd *mds = NULL; /* quell gcc overwarning */ struct obd_device *obd = NULL; @@ -1130,7 +1130,7 @@ int mds_handle(struct ptlrpc_request *req) break; rc = mds_reint(req, 0, NULL); - OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0); + fail = OBD_FAIL_MDS_REINT_NET_REP; break; } @@ -1253,7 +1253,7 @@ int mds_handle(struct ptlrpc_request *req) rc = req->rq_status = -ENOTCONN; } - target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET); + target_send_reply(req, rc, fail); return 0; } @@ -1438,13 +1438,12 @@ static int mds_postrecov(struct obd_device *obd) LASSERT(!obd->obd_recovering); -#ifdef ENABLE_ORPHANS rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT), obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL); if (rc != 0) { CERROR("faild at llog_origin_connect: %d\n", rc); } -#endif + rc = mds_cleanup_orphans(obd); rc2 = mds_lov_set_nextid(obd); diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index a9b02ee..549c760 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -54,7 +54,7 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, struct llog_logid *logid, - struct llog_ctxt_gen *gen) + struct llog_gen *gen) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; @@ -86,9 +86,7 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, { struct mds_obd *mds = &obd->u.mds; struct lov_stripe_md *lsm = NULL; -#ifdef ENABLE_ORPHANS struct llog_ctxt *ctxt; -#endif int rc; ENTRY; @@ -101,11 +99,9 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, if (rc < 0) RETURN(rc); -#ifdef ENABLE_ORPHANS ctxt = llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT); rc = llog_add(ctxt, NULL, lsm, lustre_msg_buf(repmsg, offset + 1, 0), repmsg->buflens[offset + 1] / sizeof(struct llog_cookie)); -#endif obd_free_memmd(mds->mds_osc_exp, &lsm); diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 5476684..d272b33 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -227,9 +227,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) } valsize = sizeof(mds->mds_lov_desc); - rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc", + rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc", &valsize, &mds->mds_lov_desc); - if (rc) + if (rc) GOTO(err_reg, rc); mds->mds_max_mdsize = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count); @@ -240,21 +240,20 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) if (rc) { CERROR("cannot read %s: rc = %d\n", "lov_objids", rc); GOTO(err_reg, rc); - } + } -#ifdef ENABLE_ORPHANS rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); if (rc) { CERROR("failed to initialize catalog %d\n", rc); GOTO(err_reg, rc); } -#endif - /* FIXME before this set info call is made, we must initialize the logging */ + + /* FIXME before set info call is made, we must initialize logging */ rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn", 0, NULL); - if (rc) + if (rc) GOTO(err_reg, rc); - + /* If we're mounting this code for the first time on an existing FS, * we need to populate the objids array from the real OST values */ if (!mds->mds_lov_objids_valid) { @@ -279,13 +278,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) * it can use the obd_recovering flag to determine when the * the OBD is full available. */ if (!obd->obd_recovering) { -#ifdef ENABLE_ORPHANS rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT), - obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL); - if (rc != 0) { + obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, + NULL); + if (rc != 0) CERROR("faild at llog_origin_connect: %d\n", rc); - } -#endif + rc = mds_cleanup_orphans(obd); if (rc > 0) CERROR("Cleanup %d orphans while MDS isn't recovering\n", rc); @@ -297,12 +295,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) RETURN(rc); err_llog: -#ifdef ENABLE_ORPHANS /* cleanup all llogging subsystems */ rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count); - if (rc) + if (rc) CERROR("failed to cleanup llogging subsystems\n"); -#endif err_reg: obd_register_observer(mds->mds_osc_obd, NULL); err_discon: @@ -319,12 +315,10 @@ int mds_lov_disconnect(struct obd_device *obd, int flags) ENTRY; if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) { -#ifdef ENABLE_ORPHANS /* cleanup all llogging subsystems */ rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count); - if (rc) + if (rc) CERROR("failed to cleanup llogging subsystems\n"); -#endif obd_register_observer(mds->mds_osc_obd, NULL); @@ -358,10 +352,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, RETURN(-EBUSY); push_ctxt(&saved, &obd->obd_ctxt, NULL); - rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), + rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), &mds->mds_cfg_llh, NULL, name); if (rc == 0) - llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN, + llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN, &cfg_uuid); else mds->mds_cfg_llh = NULL; @@ -418,7 +412,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, } case OBD_IOC_PARSE: { - struct llog_ctxt *ctxt = + struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); push_ctxt(&saved, &obd->obd_ctxt, NULL); rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); @@ -430,7 +424,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, } case OBD_IOC_DUMP_LOG: { - struct llog_ctxt *ctxt = + struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); push_ctxt(&saved, &obd->obd_ctxt, NULL); rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL); @@ -462,33 +456,29 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, RETURN(rc); } - case OBD_IOC_LLOG_CHECK: + case OBD_IOC_LLOG_CHECK: case OBD_IOC_LLOG_CANCEL: - case OBD_IOC_LLOG_REMOVE: { - struct llog_ctxt *ctxt = + case OBD_IOC_LLOG_REMOVE: { + struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); -#ifdef ENABLE_ORPHANS obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count); -#endif push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); rc = llog_ioctl(ctxt, cmd, data); pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); - -#ifdef ENABLE_ORPHANS llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); -#endif + RETURN(rc); - } + } case OBD_IOC_LLOG_INFO: case OBD_IOC_LLOG_PRINT: { - struct llog_ctxt *ctxt = + struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - + push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); rc = llog_ioctl(ctxt, cmd, data); pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); - + RETURN(rc); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 457bbcf..f3298bf 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1035,14 +1035,13 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd, GOTO(cleanup, rc); } -#ifdef ENABLE_ORPHANS if (req != NULL && (reply_body->valid & OBD_MD_FLEASIZE) && mds_log_op_unlink(obd, pending_child->d_inode, req->rq_repmsg, 1) > 0) { reply_body->valid |= OBD_MD_FLCOOKIE; } -#endif + pending_child->d_fsdata = (void *) &dp; dp.p_inum = 0; dp.p_ptr = req; diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 626248d..20aa83d 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -62,9 +62,7 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno, { struct mds_logcancel_data *mlcd = cb_data; struct lov_stripe_md *lsm = NULL; -#ifdef ENABLE_ORPHANS struct llog_ctxt *ctxt; -#endif int rc; obd_transno_commit_cb(obd, transno, error); @@ -79,7 +77,6 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno, (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)), rc); } else { -#ifdef ENABLE_ORPHANS ///* XXX 0 normally, SENDNOW for debug */); ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1); rc = llog_cancel(ctxt, lsm, @@ -90,7 +87,6 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno, CERROR("error cancelling %d log cookies: rc %d\n", (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)), rc); -#endif } OBD_FREE(mlcd, mlcd->mlcd_size); @@ -1184,13 +1180,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, cleanup_phase = 4; /* transaction */ rc = vfs_unlink(dparent->d_inode, dchild); -#ifdef ENABLE_ORPHANS - if (!rc && log_unlink) if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg, offset + 1) > 0) body->valid |= OBD_MD_FLCOOKIE; -#endif break; } case S_IFLNK: diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 330ef32..415b133 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -139,7 +139,6 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds, oa->o_mode = body->mode & S_IFMT; oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; -#ifdef ENABLE_ORPHANS if (body->valid & OBD_MD_FLCOOKIE) { oa->o_valid |= OBD_MD_FLCOOKIE; oti.oti_logcookies = @@ -150,7 +149,6 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds, oa->o_valid &= ~OBD_MD_FLCOOKIE; body->valid &= ~OBD_MD_FLCOOKIE; } -#endif rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti); obdo_free(oa); @@ -209,12 +207,11 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild, CERROR("error %d unlinking orphan %*s from PENDING directory\n", rc, dchild->d_name.len, dchild->d_name.name); -#ifdef ENABLE_ORPHANS if ((body->valid & OBD_MD_FLEASIZE)) { if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0) body->valid |= OBD_MD_FLCOOKIE; } -#endif + if (handle) { int err = fsfilt_commit(obd, pending_dir, handle, 0); if (err) { diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index ad7ddcd..82ceab4 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -77,7 +77,7 @@ void llog_free_handle(struct llog_handle *loghandle) } EXPORT_SYMBOL(llog_free_handle); -/* returns negative on error; 0 if success; 1 if success & log destroyed */ +/* returns negative on error; 0 if success; 1 if success & log destroyed */ int llog_cancel_rec(struct llog_handle *loghandle, int index) { struct llog_log_hdr *llh = loghandle->lgh_hdr; @@ -101,7 +101,7 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) && (le32_to_cpu(llh->llh_count) == 1) && - (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { + (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { rc = llog_destroy(loghandle); if (rc) CERROR("failure destroying log after last cancel: %d\n", @@ -111,7 +111,7 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) } rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); - if (rc) + if (rc) CERROR("failure re-writing header %d\n", rc); LASSERT(rc == 0); RETURN(rc); @@ -144,16 +144,17 @@ int llog_init_handle(struct llog_handle *handle, int flags, GOTO(out, rc); } rc = 0; - + handle->lgh_last_idx = 0; /* header is record with index 0 */ llh->llh_count = cpu_to_le32(1); /* for the header record */ llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC); - llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = cpu_to_le32(LLOG_CHUNK_SIZE); + llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = + cpu_to_le32(LLOG_CHUNK_SIZE); llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0; llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME)); if (uuid) memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid)); - llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh), llh_bitmap)); + llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap)); ext2_set_bit(0, llh->llh_bitmap); out: @@ -165,7 +166,7 @@ out: INIT_LIST_HEAD(&handle->u.phd.phd_entry); else LBUG(); - + if (rc) { OBD_FREE(llh, sizeof(*llh)); handle->lgh_hdr = NULL; @@ -192,12 +193,14 @@ int llog_close(struct llog_handle *loghandle) } EXPORT_SYMBOL(llog_close); -int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data) +int llog_process(struct llog_handle *loghandle, llog_cb_t cb, + void *data, void *catdata) { struct llog_log_hdr *llh = loghandle->lgh_hdr; + struct llog_process_cat_data *cd = catdata; void *buf; __u64 cur_offset = LLOG_CHUNK_SIZE; - int rc = 0, index = 1; + int rc = 0, index = 1, last_index, idx; int saved_index = 0; ENTRY; @@ -205,27 +208,41 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data) if (!buf) RETURN(-ENOMEM); + if (cd != NULL) + index = cd->first_idx + 1; + if (cd != NULL && cd->last_idx) + last_index = cd->last_idx; + else + last_index = LLOG_BITMAP_BYTES * 8 - 1; + + while (rc == 0) { struct llog_rec_hdr *rec; - + /* skip records not set in bitmap */ - while (index < (LLOG_BITMAP_BYTES * 8) && + while (index <= last_index && !ext2_test_bit(index, llh->llh_bitmap)) ++index; - LASSERT(index <= LLOG_BITMAP_BYTES * 8); - if (index == LLOG_BITMAP_BYTES * 8) + LASSERT(index <= last_index + 1); + if (index == last_index + 1) break; /* get the buf with our target record; avoid old garbage */ memset(buf, 0, LLOG_CHUNK_SIZE); - rc = llog_next_block(loghandle, &saved_index, index, + rc = llog_next_block(loghandle, &saved_index, index, &cur_offset, buf, LLOG_CHUNK_SIZE); if (rc) GOTO(out, rc); rec = buf; - index = le32_to_cpu(rec->lrh_index); + idx = le32_to_cpu(rec->lrh_index); + if (idx < index) + CDEBUG(D_HA, "index %u : idx %u\n", index, idx); + while (idx < index) { + rec = ((void *)rec + le32_to_cpu(rec->lrh_len)); + idx ++; + } /* process records in buffer, starting where we found one */ while ((void *)rec < buf + LLOG_CHUNK_SIZE) { @@ -235,13 +252,20 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data) /* if set, process the callback on this record */ if (ext2_test_bit(index, llh->llh_bitmap)) { rc = cb(loghandle, rec, data); - if (rc) + if (rc == LLOG_PROC_BREAK) { + CWARN("recovery from log: "LPX64":%x" + " stopped\n", + loghandle->lgh_id.lgl_oid, + loghandle->lgh_id.lgl_ogen); + GOTO(out, rc); + } + if (rc) GOTO(out, rc); } /* next record, still in buffer? */ ++index; - if (index > LLOG_BITMAP_BYTES * 8 - 1) + if (index > last_index) GOTO(out, rc = 0); rec = ((void *)rec + le32_to_cpu(rec->lrh_len)); } diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 4c49a10..a962797 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -52,38 +52,41 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) struct llog_handle *loghandle; struct llog_log_hdr *llh; struct llog_logid_rec rec; - int rc, index, bitmap_size, i; + int rc, index, bitmap_size; ENTRY; + llh = cathandle->lgh_hdr; + bitmap_size = sizeof(llh->llh_bitmap) * 8; + + index = (cathandle->lgh_last_idx + 1) % bitmap_size; + + /* maximum number of available slots in catlog is bitmap_size - 2 */ + if (llh->llh_cat_idx == cpu_to_le32(index)) { + CERROR("no free catalog slots for log...\n"); + RETURN(ERR_PTR(-ENOSPC)); + } else { + if (index == 0) + index = 1; + if (ext2_set_bit(index, llh->llh_bitmap)) { + CERROR("argh, index %u already set in log bitmap?\n", + index); + LBUG(); /* should never happen */ + } + cathandle->lgh_last_idx = index; + llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); + llh->llh_tail.lrt_index = cpu_to_le32(index); + } + rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL); if (rc) RETURN(ERR_PTR(rc)); - rc = llog_init_handle(loghandle, - LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, + rc = llog_init_handle(loghandle, + LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, &cathandle->lgh_hdr->llh_tgtuuid); if (rc) GOTO(out_destroy, rc); - /* Find first free entry */ - llh = cathandle->lgh_hdr; - bitmap_size = sizeof(llh->llh_bitmap) * 8; - for (i = 0, index = le32_to_cpu(llh->llh_count); i < bitmap_size; - i++, index++) { - index %= bitmap_size; - if (ext2_set_bit(index, llh->llh_bitmap)) { - /* XXX This should trigger log clean up or similar */ - CERROR("catalog index %d is still in use\n", index); - } else { - cathandle->lgh_last_idx = index; - llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); - break; - } - } - if (i == bitmap_size) { - CERROR("no free catalog slots for log...\n"); - GOTO(out_destroy, rc = -ENOSPC); - } CWARN("new recovery log "LPX64":%x for index %u of catalog "LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index, cathandle->lgh_id.lgl_oid); @@ -96,7 +99,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) rec.lid_tail.lrt_index = cpu_to_le32(index); /* update the catalog: header and record */ - rc = llog_write_rec(cathandle, &rec.lid_hdr, + rc = llog_write_rec(cathandle, &rec.lid_hdr, &loghandle->u.phd.phd_cookie, 1, NULL, index); if (rc < 0) { GOTO(out_destroy, rc); @@ -115,7 +118,10 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) } EXPORT_SYMBOL(llog_cat_new_log); -/* Assumes caller has already pushed us into the kernel context and is locking. +/* Open an existent log handle and add it to the open list. + * This log handle will be closed when all of the records in it are removed. + * + * Assumes caller has already pushed us into the kernel context and is locking. * We return a lock on the handle to ensure nobody yanks it from us. */ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, @@ -128,7 +134,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, if (cathandle == NULL) RETURN(-EBADF); - list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, + list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, u.phd.phd_entry) { struct llog_logid *cgl = &loghandle->lgh_id; if (cgl->lgl_oid == logid->lgl_oid) { @@ -139,7 +145,6 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, continue; } loghandle->u.phd.phd_cat_handle = cathandle; - cathandle->u.chd.chd_current_log = loghandle; GOTO(out, rc = 0); } } @@ -151,15 +156,14 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, } else { rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL); if (!rc) { - list_add(&loghandle->u.phd.phd_entry, + list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); - cathandle->u.chd.chd_current_log = loghandle; } } if (!rc) { loghandle->u.phd.phd_cat_handle = cathandle; loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id; - loghandle->u.phd.phd_cookie.lgc_index = + loghandle->u.phd.phd_cookie.lgc_index = le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx); } @@ -174,7 +178,7 @@ int llog_cat_put(struct llog_handle *cathandle) int rc; ENTRY; - list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, + list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, u.phd.phd_entry) { int err = llog_close(loghandle); if (err) @@ -195,7 +199,7 @@ EXPORT_SYMBOL(llog_cat_put); * * NOTE: loghandle is write-locked upon successful return */ -static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, +static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, int create) { struct llog_handle *loghandle = NULL; @@ -205,7 +209,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, loghandle = cathandle->u.chd.chd_current_log; if (loghandle) { struct llog_log_hdr *llh = loghandle->lgh_hdr; - if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) { + if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) { down_write(&loghandle->lgh_lock); up_read(&cathandle->lgh_lock); RETURN(loghandle); @@ -226,7 +230,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, loghandle = cathandle->u.chd.chd_current_log; if (loghandle) { struct llog_log_hdr *llh = loghandle->lgh_hdr; - if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) { + if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) { down_write(&loghandle->lgh_lock); up_write(&cathandle->lgh_lock); RETURN(loghandle); @@ -247,7 +251,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, * Assumes caller has already pushed us into the kernel context. */ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, void *buf) + struct llog_cookie *reccookie, void *buf) { struct llog_handle *loghandle; int rc; @@ -260,6 +264,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, /* loghandle is already locked by llog_cat_current_log() for us */ rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1); up_write(&loghandle->lgh_lock); + RETURN(rc); } EXPORT_SYMBOL(llog_cat_add_rec); @@ -293,15 +298,20 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count, down_write(&loghandle->lgh_lock); rc = llog_cancel_rec(loghandle, cookies->lgc_index); up_write(&loghandle->lgh_lock); - + if (rc == 1) { /* log has been destroyed */ index = loghandle->u.phd.phd_cookie.lgc_index; if (cathandle->u.chd.chd_current_log == loghandle) cathandle->u.chd.chd_current_log = NULL; llog_free_handle(loghandle); - + LASSERT(index); + llog_cat_set_first_idx(cathandle, index); rc = llog_cancel_rec(cathandle, index); + if (rc == 0) + CDEBUG(D_HA, "cancel plain log at index %u " + "of catalog "LPX64"\n", + index, cathandle->lgh_id.lgl_oid); } } up_write(&cathandle->lgh_lock); @@ -310,7 +320,8 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count, } EXPORT_SYMBOL(llog_cat_cancel_records); -int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data) +int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, + void *data) { struct llog_process_data *d = data; struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; @@ -321,33 +332,85 @@ int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, v CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } - CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", + CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid); rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id); if (rc) { - CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid); + CERROR("Cannot find handle for log "LPX64"\n", + lir->lid_id.lgl_oid); RETURN(rc); - } + } - rc = llog_process(llh, d->lpd_cb, d->lpd_data); + rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL); RETURN(rc); } int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) { struct llog_process_data d; + struct llog_process_cat_data cd; + struct llog_log_hdr *llh = cat_llh->lgh_hdr; int rc; ENTRY; + + LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT)); d.lpd_data = data; d.lpd_cb = cb; - rc = llog_process(cat_llh, llog_cat_process_cb, &d); + if (llh->llh_cat_idx > cat_llh->lgh_last_idx) { + CWARN("catlog "LPX64" crosses index zero\n", + cat_llh->lgh_id.lgl_oid); + + cd.first_idx = le32_to_cpu(llh->llh_cat_idx); + cd.last_idx = 0; + rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); + if (rc != 0) + RETURN(rc); + + cd.first_idx = 0; + cd.last_idx = cat_llh->lgh_last_idx; + rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); + } else { + rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL); + } + RETURN(rc); } EXPORT_SYMBOL(llog_cat_process); +int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) +{ + struct llog_log_hdr *llh = cathandle->lgh_hdr; + int i, bitmap_size, idx; + ENTRY; + + bitmap_size = sizeof(llh->llh_bitmap) * 8; + if (llh->llh_cat_idx == cpu_to_le32(index - 1)) { + idx = le32_to_cpu(llh->llh_cat_idx) + 1; + llh->llh_cat_idx = cpu_to_le32(idx); + if (idx == cathandle->lgh_last_idx) + goto out; + for (i = (index + 1) % bitmap_size; + i != cathandle->lgh_last_idx; + i = (i + 1) % bitmap_size) { + if (!ext2_test_bit(i, llh->llh_bitmap)) { + idx = le32_to_cpu(llh->llh_cat_idx) + 1; + llh->llh_cat_idx = cpu_to_le32(idx); + } else if (i == 0) { + llh->llh_cat_idx = 0; + } else { + break; + } + } +out: + CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n", + cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx)); + } + + RETURN(0); +} #if 0 /* Assumes caller has already pushed us into the kernel context. */ @@ -366,7 +429,7 @@ int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid) if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) { llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0); -write_hdr: +write_hdr: rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, &offset); if (rc != LLOG_CHUNK_SIZE) { diff --git a/lustre/obdclass/llog_ioctl.c b/lustre/obdclass/llog_ioctl.c index 3a6fb7d..785739f 100644 --- a/lustre/obdclass/llog_ioctl.c +++ b/lustre/obdclass/llog_ioctl.c @@ -21,11 +21,11 @@ static int str2logid(struct llog_logid *logid, char *str, int len) { char *start, *end, *endp; - + start = str; if (*start != '#') RETURN(-EINVAL); - + start++; if (start - str >= len - 1) RETURN(-EINVAL); @@ -34,7 +34,7 @@ static int str2logid(struct llog_logid *logid, char *str, int len) RETURN(-EINVAL); *end = '\0'; - logid->lgl_oid = simple_strtoull(start, &endp, 16); + logid->lgl_oid = simple_strtoull(start, &endp, 16); if (endp != end) RETURN(-EINVAL); @@ -60,7 +60,7 @@ static int str2logid(struct llog_logid *logid, char *str, int len) RETURN(0); } -static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, +static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, void *data) { struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data; @@ -68,12 +68,12 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, static char *out; char *endp; int cur_index, rc = 0; - + cur_index = le32_to_cpu(rec->lrh_index); - + if (ioc_data && (ioc_data->ioc_inllen1)) { l = 0; - remains = ioc_data->ioc_inllen4 + + remains = ioc_data->ioc_inllen4 + size_round(ioc_data->ioc_inllen1) + size_round(ioc_data->ioc_inllen2) + size_round(ioc_data->ioc_inllen3); @@ -92,58 +92,58 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, } if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; - struct llog_handle *log_handle; - - if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) { - l = snprintf(out, remains, - "[index]: %05d [type]: %02x [len]: %04d failed\n", + struct llog_handle *log_handle; + + if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) { + l = snprintf(out, remains, "[index]: %05d [type]: " + "%02x [len]: %04d failed\n", cur_index, le32_to_cpu(rec->lrh_type), le32_to_cpu(rec->lrh_len)); } if (handle->lgh_ctxt == NULL) RETURN(-EOPNOTSUPP); llog_cat_id2handle(handle, &log_handle, &lir->lid_id); - rc = llog_process(log_handle, llog_check_cb, NULL); + rc = llog_process(log_handle, llog_check_cb, NULL, NULL); llog_close(log_handle); } else { switch (le32_to_cpu(rec->lrh_type)) { case OST_SZ_REC: - case OST_RAID1_REC: + case OST_RAID1_REC: case MDS_UNLINK_REC: - case OBD_CFG_REC: - case PTL_CFG_REC: - case LLOG_HDR_MAGIC: { - l = snprintf(out, remains, - "[index]: %05d [type]: %02x [len]: %04d ok\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + case OBD_CFG_REC: + case PTL_CFG_REC: + case LLOG_HDR_MAGIC: { + l = snprintf(out, remains, "[index]: %05d [type]: " + "%02x [len]: %04d ok\n", + cur_index, le32_to_cpu(rec->lrh_type), + le32_to_cpu(rec->lrh_len)); out += l; remains -= l; if (remains <= 0) { - CERROR("not enough space for print log records\n"); + CERROR("no space to print log records\n"); RETURN(-LLOG_EEMPTY); } RETURN(0); } default: { - l = snprintf(out, remains, - "[index]: %05d [type]: %02x [len]: %04d failed\n", - cur_index, le32_to_cpu(rec->lrh_type), - le32_to_cpu(rec->lrh_len)); + l = snprintf(out, remains, "[index]: %05d [type]: " + "%02x [len]: %04d failed\n", + cur_index, le32_to_cpu(rec->lrh_type), + le32_to_cpu(rec->lrh_len)); out += l; remains -= l; if (remains <= 0) { - CERROR("not enough space for print log records\n"); + CERROR("no space to print log records\n"); RETURN(-LLOG_EEMPTY); } RETURN(0); - } + } } } RETURN(rc); } -static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, +static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, void *data) { struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data; @@ -151,10 +151,10 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, static char *out; char *endp; int cur_index; - + if (ioc_data->ioc_inllen1) { l = 0; - remains = ioc_data->ioc_inllen4 + + remains = ioc_data->ioc_inllen4 + size_round(ioc_data->ioc_inllen1) + size_round(ioc_data->ioc_inllen2) + size_round(ioc_data->ioc_inllen3); @@ -183,11 +183,11 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, l = snprintf(out, remains, "[index]: %05d [logid]: #%llx#%llx#%08x\n", - cur_index, lir->lid_id.lgl_oid, + cur_index, lir->lid_id.lgl_oid, lir->lid_id.lgl_ogr, lir->lid_id.lgl_ogen); } else { l = snprintf(out, remains, - "[index]: %05d [type]: %02x [len]: %04d\n", + "[index]: %05d [type]: %02x [len]: %04d\n", cur_index, le32_to_cpu(rec->lrh_type), le32_to_cpu(rec->lrh_len)); } @@ -204,7 +204,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid) { struct llog_handle *log; int rc, index = 0; - + down_write(&cat->lgh_lock); rc = llog_cat_id2handle(cat, &log, logid); if (rc) { @@ -212,7 +212,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid) logid->lgl_oid, logid->lgl_ogr, logid->lgl_ogen); GOTO(out, rc = -ENOENT); } - + index = log->u.phd.phd_cookie.lgc_index; LASSERT(index); rc = llog_destroy(log); @@ -220,6 +220,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid) CDEBUG(D_IOCTL, "cannot destroy log\n"); GOTO(out, rc); } + llog_cat_set_first_idx(cat, index); rc = llog_cancel_rec(cat, index); out: llog_free_handle(log); @@ -227,16 +228,17 @@ out: RETURN(rc); } -static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, - void *data) + +static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, + void *data) { struct llog_logid_rec *lir = (struct llog_logid_rec*)rec; int rc; - + if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) - return (-EINVAL); + return (-EINVAL); rc = llog_remove_log(handle, &lir->lid_id); - + RETURN(rc); } @@ -246,14 +248,14 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) struct llog_logid logid; int err = 0; struct llog_handle *handle = NULL; - + if (*data->ioc_inlbuf1 == '#') { err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1); if (err) GOTO(out, err); err = llog_create(ctxt, &handle, &logid, NULL); if (err) - GOTO(out, err); + GOTO(out, err); } else if (*data->ioc_inlbuf1 == '$') { char *name = data->ioc_inlbuf1 + 1; err = llog_create(ctxt, &handle, NULL, name); @@ -264,17 +266,17 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) } err = llog_init_handle(handle, 0, NULL); - if (err) + if (err) GOTO(out_close, err = -ENOENT); - + switch (cmd) { case OBD_IOC_LLOG_INFO: { int l; - int remains = data->ioc_inllen2 + + int remains = data->ioc_inllen2 + size_round(data->ioc_inllen1); char *out = data->ioc_bulk; - l = snprintf(out, remains, + l = snprintf(out, remains, "logid: #%llx#%llx#%08x\n" "flags: %x (%s)\n" "records count: %d\n" @@ -282,20 +284,20 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr, handle->lgh_id.lgl_ogen, le32_to_cpu(handle->lgh_hdr->llh_flags), - le32_to_cpu(handle->lgh_hdr->llh_flags) & + le32_to_cpu(handle->lgh_hdr->llh_flags) & LLOG_F_IS_CAT ? "cat" : "plain", le32_to_cpu(handle->lgh_hdr->llh_count), handle->lgh_last_idx); out += l; remains -= l; - if (remains <= 0) + if (remains <= 0) CERROR("not enough space for log header info\n"); GOTO(out_close, err); } case OBD_IOC_LLOG_CHECK: { LASSERT(data->ioc_inllen1); - err = llog_process(handle, llog_check_cb, data); + err = llog_process(handle, llog_check_cb, data, NULL); if (err == -LLOG_EEMPTY) err = 0; GOTO(out_close, err); @@ -303,7 +305,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) case OBD_IOC_LLOG_PRINT: { LASSERT(data->ioc_inllen1); - err = llog_process(handle, llog_print_cb, data); + err = llog_process(handle, llog_print_cb, data, NULL); if (err == -LLOG_EEMPTY) err = 0; @@ -313,16 +315,15 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) struct llog_cookie cookie; struct llog_logid plain; char *endp; - + if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) GOTO(out_close, err = -EINVAL); - + err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2); if (err) GOTO(out_close, err); cookie.lgc_lgl = plain; - cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3, - &endp, 0); + cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3, &endp, 0); if (*endp != '\0') GOTO(out_close, err = -EINVAL); @@ -331,26 +332,27 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data) } case OBD_IOC_LLOG_REMOVE: { struct llog_logid plain; - + if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) GOTO(out_close, err = -EINVAL); - + if (data->ioc_inlbuf2) { /*remove indicate log from the catalog*/ - err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2); + err = str2logid(&plain, data->ioc_inlbuf2, + data->ioc_inllen2); if (err) GOTO(out_close, err); err = llog_remove_log(handle, &plain); } else { /*remove all the log of the catalog*/ - llog_process(handle, llog_delete_cb, NULL); + llog_process(handle, llog_delete_cb, NULL, NULL); } GOTO(out_close, err); } } - + out_close: - if (handle->lgh_hdr && + if (handle->lgh_hdr && handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) llog_cat_put(handle); else @@ -360,7 +362,7 @@ out: } EXPORT_SYMBOL(llog_ioctl); -int llog_catlog_list(struct obd_device *obd, int count, +int llog_catlog_list(struct obd_device *obd, int count, struct obd_ioctl_data *data) { int size, i; @@ -368,25 +370,25 @@ int llog_catlog_list(struct obd_device *obd, int count, char name[32] = "CATLIST"; char *out; int l, remains, rc = 0; - + size = sizeof(*idarray) * count; - + OBD_ALLOC(idarray, size); if (!idarray) RETURN(-ENOMEM); memset(idarray, 0, size); - + rc = llog_get_cat_list(obd, obd, name, count, idarray); if (rc) { OBD_FREE(idarray, size); RETURN(rc); } - + out = data->ioc_bulk; remains = data->ioc_inllen1; id = idarray; for (i = 0; i < count; i++) { - l = snprintf(out, remains, + l = snprintf(out, remains, "catalog log: #%llx#%llx#%08x\n", id->lgl_oid, id->lgl_ogr, id->lgl_ogen); id++; diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c index 5eb2922..18a1afe 100644 --- a/lustre/obdclass/llog_lvfs.c +++ b/lustre/obdclass/llog_lvfs.c @@ -70,7 +70,7 @@ static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file, } file->f_pos += len - sizeof(rec) - sizeof(tail); - rc = fsfilt_write_record(obd, file, &tail, sizeof(tail), &file->f_pos, 0); + rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0); if (rc) { CERROR("error writing padding record: rc %d\n", rc); goto out; @@ -92,7 +92,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file, file->f_pos = off; if (!buf) { - rc = fsfilt_write_record(obd, file, rec, buflen, &file->f_pos, 0); + rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0); if (rc) { CERROR("error writing log record: rc %d\n", rc); goto out; @@ -147,13 +147,12 @@ static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file, static int llog_lvfs_read_header(struct llog_handle *handle) { - struct llog_rec_tail tail; struct obd_device *obd; int rc; ENTRY; LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE); - + obd = handle->lgh_ctxt->loc_exp->exp_obd; if (handle->lgh_file->f_dentry->d_inode->i_size == 0) { @@ -166,13 +165,7 @@ static int llog_lvfs_read_header(struct llog_handle *handle) if (rc) CERROR("error reading log header\n"); - rc = llog_lvfs_read_blob(obd, handle->lgh_file, &tail, sizeof(tail), - handle->lgh_file->f_dentry->d_inode->i_size - - sizeof(tail)); - if (rc) - CERROR("error reading log tail\n"); - - handle->lgh_last_idx = le32_to_cpu(tail.lrt_index); + handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index); handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size; RETURN(rc); @@ -207,7 +200,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, if (rc) RETURN(rc); - if (idx != -1) { + if (idx != -1) { loff_t saved_offset; /* no header: only allowed to insert record 1 */ @@ -224,13 +217,13 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, if (rc || idx == 0) RETURN(rc); - saved_offset = sizeof(*llh) + (idx-1) * le32_to_cpu(rec->lrh_len); + saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len); rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = idx; rc = 1; - } + } RETURN(rc); } @@ -242,12 +235,12 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, * big enough to hold reclen, so all we care about is padding here. */ left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1)); - if (buf) - reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) + + if (buf) + reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) + sizeof(struct llog_rec_tail); /* NOTE: padding is a record, but no bit is set */ - if (left != 0 && left != reclen && + if (left != 0 && left != reclen && left < (reclen + LLOG_MIN_REC_SIZE)) { loghandle->lgh_last_idx++; rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx); @@ -268,6 +261,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, LBUG(); /* should never happen */ } llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1); + llh->llh_tail.lrt_index = cpu_to_le32(index); offset = 0; rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0); @@ -289,10 +283,13 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT; else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC) reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT; - else + else reccookie->lgc_subsys = -1; rc = 1; } + if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC) + rc = 1; + RETURN(rc); } @@ -306,7 +303,7 @@ static void llog_skip_over(__u64 *off, int curr, int goal) { if (goal <= curr) return; - *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) & + *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) & ~(LLOG_CHUNK_SIZE - 1); } @@ -345,7 +342,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx, CERROR("Cant read llog block at log id "LPU64 "/%u offset "LPU64"\n", loghandle->lgh_id.lgl_oid, - loghandle->lgh_id.lgl_ogen, + loghandle->lgh_id.lgl_ogen, *cur_offset); RETURN(rc); } @@ -405,11 +402,11 @@ static struct file *llog_filp_open(char *name, int flags, int mode) } else { filp = l_filp_open(logname, flags, mode); if (IS_ERR(filp)) - CERROR("logfile creation %s: %ld\n", logname, + CERROR("logfile creation %s: %ld\n", logname, PTR_ERR(filp)); } - OBD_FREE(logname, PATH_MAX); + OBD_FREE(logname, PATH_MAX); return filp; } @@ -441,7 +438,7 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res, if (IS_ERR(dchild)) { rc = PTR_ERR(dchild); - CERROR("error looking up log file "LPX64":0x%x: rc %d\n", + CERROR("error looking up logfile "LPX64":0x%x: rc %d\n", logid->lgl_oid, logid->lgl_ogen, rc); GOTO(cleanup, rc); } @@ -656,14 +653,16 @@ static int llog_lvfs_read_header(struct llog_handle *handle) LBUG(); return 0; } + static int llog_lvfs_write_rec(struct llog_handle *loghandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, int cookiecount, + struct llog_cookie *reccookie, int cookiecount, void *buf, int idx) { LBUG(); return 0; } + static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx, int next_idx, __u64 *cur_offset, void *buf, int len) @@ -671,31 +670,34 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx, LBUG(); return 0; } -static int llog_lvfs_create(struct llog_obd_ctxt *ctxt, struct llog_handle **res, + +static int llog_lvfs_create(struct llog_obd_ctxt *ctxt,struct llog_handle **res, struct llog_logid *logid, char *name) { LBUG(); return 0; } + static int llog_lvfs_close(struct llog_handle *handle) { LBUG(); return 0; } + static int llog_lvfs_destroy(struct llog_handle *handle) { LBUG(); return 0; } -int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd, +int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd, char *name, int count, struct llog_logid *idarray) { LBUG(); return 0; } -int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, +int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, char *name, int count, struct llog_logid *idarray) { LBUG(); diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 9c9abb7..ea68bb3 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -26,7 +26,7 @@ /* helper functions for calling the llog obd methods */ -int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, +int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, int count, struct llog_logid *logid, struct llog_operations *op) { int rc = 0; @@ -49,7 +49,7 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, if (op->lop_setup) rc = op->lop_setup(obd, index, disk_obd, count, logid); - if (ctxt && rc) + if (ctxt && rc) OBD_FREE(ctxt, sizeof(*ctxt)); RETURN(rc); @@ -61,7 +61,6 @@ int llog_cleanup(struct llog_ctxt *ctxt) int rc = 0; ENTRY; - down(&ctxt->loc_sem); LASSERT(ctxt); if (CTXTP(ctxt, cleanup)) @@ -70,7 +69,6 @@ int llog_cleanup(struct llog_ctxt *ctxt) ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL; class_export_put(ctxt->loc_exp); ctxt->loc_exp = NULL; - up(&ctxt->loc_sem); OBD_FREE(ctxt, sizeof(*ctxt)); RETURN(rc); @@ -84,29 +82,25 @@ int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp) if (!ctxt) RETURN(0); - down(&ctxt->loc_sem); - if (ctxt->loc_llcd && CTXTP(ctxt, sync)) + + if (CTXTP(ctxt, sync)) rc = CTXTP(ctxt, sync)(ctxt, exp); - else - up(&ctxt->loc_sem); RETURN(rc); } EXPORT_SYMBOL(llog_sync); -int llog_add(struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies) +int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + struct lov_stripe_md *lsm, struct llog_cookie *logcookies, + int numcookies) { int rc; ENTRY; LASSERT(ctxt); - down(&ctxt->loc_sem); CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP); rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies); - up(&ctxt->loc_sem); RETURN(rc); } EXPORT_SYMBOL(llog_add); @@ -125,7 +119,7 @@ int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, EXPORT_SYMBOL(llog_cancel); /* callback func for llog_process in llog_obd_origin_setup */ -static int cat_cancel_cb(struct llog_handle *cathandle, +static int cat_cancel_cb(struct llog_handle *cathandle, struct llog_rec_hdr *rec, void *data) { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; @@ -138,35 +132,36 @@ static int cat_cancel_cb(struct llog_handle *cathandle, CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } - CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", + CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid); rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id); if (rc) { - CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid); + CERROR("Cannot find handle for log "LPX64"\n", + lir->lid_id.lgl_oid); RETURN(rc); - } - + } + llh = loghandle->lgh_hdr; if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) && (le32_to_cpu(llh->llh_count) == 1)) { rc = llog_destroy(loghandle); if (rc) - CERROR("failure destroying log during postsetup: %d\n", rc); + CERROR("failure destroying log in postsetup: %d\n", rc); LASSERT(rc == 0); index = loghandle->u.phd.phd_cookie.lgc_index; - if (cathandle->u.chd.chd_current_log == loghandle) - cathandle->u.chd.chd_current_log = NULL; llog_free_handle(loghandle); - + LASSERT(index); + llog_cat_set_first_idx(cathandle, index); rc = llog_cancel_rec(cathandle, index); if (rc == 0) - CWARN("cancel log "LPX64":%x at index %u of catalog "LPX64"\n", - lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, - le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid); + CWARN("cancel log "LPX64":%x at index %u of catalog " + LPX64"\n", lir->lid_id.lgl_oid, + lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index), + cathandle->lgh_id.lgl_oid); } RETURN(rc); @@ -174,8 +169,9 @@ static int cat_cancel_cb(struct llog_handle *cathandle, /* lop_setup method for filter/osc */ // XXX how to set exports -int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, - int count, struct llog_logid *logid) +int llog_obd_origin_setup(struct obd_device *obd, int index, + struct obd_device *disk_obd, int count, + struct llog_logid *logid) { struct llog_ctxt *ctxt; struct llog_handle *handle; @@ -187,20 +183,19 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device * RETURN(0); LASSERT(count == 1); - + ctxt = llog_get_context(obd, index); LASSERT(ctxt); - log_gen_init(ctxt); + llog_gen_init(ctxt); - down(&ctxt->loc_sem); if (logid->lgl_oid) rc = llog_create(ctxt, &handle, logid, NULL); else { rc = llog_create(ctxt, &handle, NULL, NULL); - if (!rc) + if (!rc) *logid = handle->lgh_id; } - if (rc) + if (rc) GOTO(out, rc); ctxt->loc_handle = handle; @@ -210,11 +205,10 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device * if (rc) GOTO(out, rc); - rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL); - if (rc) + rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL); + if (rc) CERROR("llog_process with cat_cancel_cb failed: %d\n", rc); out: - up(&ctxt->loc_sem); if (ctxt && rc) { obd->obd_llog_ctxt[index] = NULL; OBD_FREE(ctxt, sizeof(*ctxt)); @@ -229,32 +223,34 @@ int llog_obd_origin_cleanup(struct llog_ctxt *ctxt) struct llog_log_hdr *llh; int rc, index; ENTRY; - + if (!ctxt) return 0; cathandle = ctxt->loc_handle; if (cathandle) { - list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, - u.phd.phd_entry) { + list_for_each_entry_safe(loghandle, n, + &cathandle->u.chd.chd_head, + u.phd.phd_entry) { llh = loghandle->lgh_hdr; - if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) && + if ((le32_to_cpu(llh->llh_flags) & + LLOG_F_ZAP_WHEN_EMPTY) && (le32_to_cpu(llh->llh_count) == 1)) { rc = llog_destroy(loghandle); if (rc) - CERROR("failure destroying log during cleanup: %d\n", - rc); + CERROR("failure destroying log during " + "cleanup: %d\n", rc); LASSERT(rc == 0); index = loghandle->u.phd.phd_cookie.lgc_index; - if (cathandle->u.chd.chd_current_log == loghandle) - cathandle->u.chd.chd_current_log = NULL; llog_free_handle(loghandle); - + LASSERT(index); + llog_cat_set_first_idx(cathandle, index); rc = llog_cancel_rec(cathandle, index); if (rc == 0) - CWARN("cancel plain log at index %u of catalog "LPX64"\n", + CWARN("cancel plain log at index %u " + "of catalog "LPX64"\n", index, cathandle->lgh_id.lgl_oid); } } @@ -314,7 +310,7 @@ int llog_cat_initialize(struct obd_device *obd, int count) CERROR("rc: %d\n", rc); GOTO(out, rc); } - + out: OBD_FREE(idarray, size); RETURN(rc); diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 16606574..0607d12 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -86,7 +86,7 @@ static int llog_test_1(struct obd_device *obd, char *name) int rc2; ENTRY; - CERROR("1a: create a log with name: %s\n", name); + CWARN("1a: create a log with name: %s\n", name); LASSERT(ctxt); rc = llog_create(ctxt, &llh, NULL, name); @@ -100,7 +100,7 @@ static int llog_test_1(struct obd_device *obd, char *name) GOTO(out, rc); out: - CERROR("1b: close newly-created log\n"); + CWARN("1b: close newly-created log\n"); rc2 = llog_close(llh); if (rc2) { CERROR("1b: close log %s failed: %d\n", name, rc2); @@ -120,7 +120,7 @@ static int llog_test_2(struct obd_device *obd, char *name, struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT); ENTRY; - CERROR("2a: re-open a log with name: %s\n", name); + CWARN("2a: re-open a log with name: %s\n", name); rc = llog_create(ctxt, llh, NULL, name); if (rc) { CERROR("2a: re-open log with name %s failed: %d\n", name, rc); @@ -131,7 +131,7 @@ static int llog_test_2(struct obd_device *obd, char *name, if ((rc = verify_handle("2", *llh, 1))) RETURN(rc); - CERROR("2b: create a log without specified NAME & LOGID\n"); + CWARN("2b: create a log without specified NAME & LOGID\n"); rc = llog_create(ctxt, &loghandle, NULL, NULL); if (rc) { CERROR("2b: create log failed\n"); @@ -141,7 +141,7 @@ static int llog_test_2(struct obd_device *obd, char *name, logid = loghandle->lgh_id; llog_close(loghandle); - CERROR("2b: re-open the log by LOGID\n"); + CWARN("2b: re-open the log by LOGID\n"); rc = llog_create(ctxt, &loghandle, &logid, NULL); if (rc) { CERROR("2b: re-open log by LOGID failed\n"); @@ -149,7 +149,7 @@ static int llog_test_2(struct obd_device *obd, char *name, } llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid); - CERROR("2b: destroy this log\n"); + CWARN("2b: destroy this log\n"); rc = llog_destroy(loghandle); if (rc) { CERROR("2b: destroy log failed\n"); @@ -171,7 +171,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh) lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr)); lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC); - CERROR("3a: write one create_rec\n"); + CWARN("3a: write one create_rec\n"); rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1); num_recs++; if (rc) { @@ -182,7 +182,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh) if ((rc = verify_handle("3a", llh, num_recs))) RETURN(rc); - CERROR("3b: write 10 cfg log records with 8 bytes bufs\n"); + CWARN("3b: write 10 cfg log records with 8 bytes bufs\n"); for (i = 0; i < 10; i++) { struct llog_rec_hdr hdr; char buf[8]; @@ -203,7 +203,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh) if ((rc = verify_handle("3b", llh, num_recs))) RETURN(rc); - CERROR("3c: write 1000 more log records\n"); + CWARN("3c: write 1000 more log records\n"); for (i = 0; i < 1000; i++) { rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1); if (rc) { @@ -242,7 +242,7 @@ static int llog_test_4(struct obd_device *obd) lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00); sprintf(name, "%x", llog_test_rand+1); - CERROR("4a: create a catalog log with name: %s\n", name); + CWARN("4a: create a catalog log with name: %s\n", name); rc = llog_create(ctxt, &cath, NULL, name); if (rc) { CERROR("1a: llog_create with name %s failed: %d\n", name, rc); @@ -252,7 +252,7 @@ static int llog_test_4(struct obd_device *obd) num_recs++; cat_logid = cath->lgh_id; - CERROR("4b: write 1 record into the catalog\n"); + CWARN("4b: write 1 record into the catalog\n"); rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL); if (rc != 1) { CERROR("4b: write 1 catalog record failed at: %d\n", rc); @@ -265,7 +265,7 @@ static int llog_test_4(struct obd_device *obd) if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs))) RETURN(rc); - CERROR("4c: cancel 1 log record\n"); + CWARN("4c: cancel 1 log record\n"); rc = llog_cat_cancel_records(cath, 1, &cookie); if (rc) { CERROR("4c: cancel 1 catalog based record failed: %d\n", rc); @@ -276,7 +276,7 @@ static int llog_test_4(struct obd_device *obd) if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs))) RETURN(rc); - CERROR("4d: write 40,000 more log records\n"); + CWARN("4d: write 40,000 more log records\n"); for (i = 0; i < 40000; i++) { rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL); if (rc) { @@ -287,7 +287,7 @@ static int llog_test_4(struct obd_device *obd) num_recs++; } - CERROR("4e: add 5 large records, one record per block\n"); + CWARN("4e: add 5 large records, one record per block\n"); buflen = LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) - sizeof(struct llog_rec_tail); OBD_ALLOC(buf, buflen); @@ -308,7 +308,7 @@ static int llog_test_4(struct obd_device *obd) OBD_FREE(buf, buflen); out: - CERROR("4f: put newly-created catalog\n"); + CWARN("4f: put newly-created catalog\n"); rc = llog_cat_put(cath); if (rc) CERROR("1b: close log %s failed: %d\n", name, rc); @@ -325,7 +325,7 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, RETURN(-EINVAL); } - CERROR("seeing record at index %d - "LPX64":%x in log "LPX64"\n", + CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n", le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid); RETURN(0); @@ -334,12 +334,12 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, void *data) { - if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } - CERROR("seeing record at index %d in log "LPX64"\n", + CWARN("seeing record at index %d in log "LPX64"\n", le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid); RETURN(0); } @@ -350,7 +350,7 @@ static int llog_cancel_rec_cb(struct llog_handle *llh, struct llog_rec_hdr *rec, struct llog_cookie cookie; static int i = 0; - if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } @@ -382,7 +382,7 @@ static int llog_test_5(struct obd_device *obd) cpu_to_le32(LLOG_MIN_REC_SIZE); lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00); - CERROR("5a: re-open catalog by id\n"); + CWARN("5a: re-open catalog by id\n"); rc = llog_create(ctxt, &llh, &cat_logid, NULL); if (rc) { CERROR("5a: llog_create with logid failed: %d\n", rc); @@ -390,21 +390,21 @@ static int llog_test_5(struct obd_device *obd) } llog_init_handle(llh, LLOG_F_IS_CAT, &uuid); - CERROR("5b: print the catalog entries.. we expect 2\n"); - rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5"); + CWARN("5b: print the catalog entries.. we expect 2\n"); + rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL); if (rc) { CERROR("5b: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); } - CERROR("5c: Cancel 40000 records, see one log zapped\n"); + CWARN("5c: Cancel 40000 records, see one log zapped\n"); rc = llog_cat_process(llh, llog_cancel_rec_cb, "foobar"); if (rc != -4711) { CERROR("5c: process with cat_cancel_cb failed: %d\n", rc); GOTO(out, rc); } - CERROR("5d: add 1 record to the log with many canceled empty pages\n"); + CWARN("5d: add 1 record to the log with many canceled empty pages\n"); rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL); if (rc) { CERROR("5d: add record to the log with many canceled empty\ @@ -412,14 +412,14 @@ static int llog_test_5(struct obd_device *obd) GOTO(out, rc); } - CERROR("5b: print the catalog entries.. we expect 1\n"); - rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5"); + CWARN("5b: print the catalog entries.. we expect 1\n"); + rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL); if (rc) { CERROR("5b: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); } - CERROR("5e: print plain log entries.. expect 6\n"); + CWARN("5e: print plain log entries.. expect 6\n"); rc = llog_cat_process(llh, plain_print_cb, "foobar"); if (rc) { CERROR("5e: process with plain_print_cb failed: %d\n", rc); @@ -427,7 +427,7 @@ static int llog_test_5(struct obd_device *obd) } out: - CERROR("5: close re-opened catalog\n"); + CWARN("5: close re-opened catalog\n"); if (llh) rc = llog_cat_put(llh); if (rc) @@ -448,7 +448,7 @@ static int llog_test_6(struct obd_device *obd, char *name) struct llog_ctxt *nctxt; int rc; - CERROR("6a: re-open log %s using client API\n", name); + CWARN("6a: re-open log %s using client API\n", name); mdc_obd = class_find_client_obd(mds_uuid, LUSTRE_MDC_NAME, NULL); if (mdc_obd == NULL) { CERROR("6: no MDC devices connected to %s found.\n", @@ -476,7 +476,7 @@ static int llog_test_6(struct obd_device *obd, char *name) GOTO(parse_out, rc); } - rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL); + rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL, NULL); if (rc) CERROR("6: llog_process failed %d\n", rc); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index d34dffc..931d5d3 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -623,7 +623,7 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name, if (rc) GOTO(parse_out, rc); - rc = llog_process(llh, class_config_llog_handler, cfg); + rc = llog_process(llh, class_config_llog_handler, cfg, NULL); parse_out: rc2 = llog_close(llh); if (rc == 0) @@ -720,7 +720,7 @@ int class_config_dump_llog(struct llog_ctxt *ctxt, char *name, if (rc) GOTO(parse_out, rc); - rc = llog_process(llh, class_config_dump_handler, cfg); + rc = llog_process(llh, class_config_dump_handler, cfg, NULL); parse_out: rc2 = llog_close(llh); if (rc == 0) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 93d363e..a9b09fd 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1181,11 +1181,9 @@ static int filter_postsetup(struct obd_device *obd) ENTRY; // XXX add a storage location for the logid for size changes -#ifdef ENABLE_ORPHANS rc = llog_cat_initialize(obd, 1); if (rc) CERROR("failed to setup llogging subsystems\n"); -#endif RETURN(rc); } @@ -1317,11 +1315,9 @@ static int filter_precleanup(struct obd_device *obd, int flags) int rc = 0; ENTRY; -#ifdef ENABLE_ORPHANS rc = obd_llog_finish(obd, 0); if (rc) CERROR("failed to cleanup llogging subsystem\n"); -#endif RETURN(rc); } @@ -1806,9 +1802,15 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa, cleanup: switch(cleanup_phase) { case 3: - if (fcc != NULL) - fsfilt_add_journal_cb(obd, 0, oti->oti_handle, - filter_cancel_cookies_cb, fcc); + if (fcc != NULL) { + if (oti != NULL) { + fsfilt_add_journal_cb(obd, 0, oti->oti_handle, + filter_cancel_cookies_cb, fcc); + } else { + fsfilt_add_journal_cb(obd, 0, handle, + filter_cancel_cookies_cb, fcc); + } + } rc = filter_finish_transno(exp, oti, rc); rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0); if (rc2) { @@ -1863,6 +1865,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, struct obd_run_ctxt saved; struct filter_obd *filter; struct dentry *dentry; + struct llog_ctxt *ctxt; int rc, rc2; ENTRY; @@ -1871,6 +1874,9 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, /* an objid of zero is taken to mean "sync whole filesystem" */ if (!oa || !(oa->o_valid & OBD_MD_FLID)) { rc = fsfilt_sync(exp->exp_obd, filter->fo_sb); + /* flush any remaining cancel messages out to the target */ + ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT); + llog_sync(ctxt, exp); RETURN(rc); } @@ -1955,9 +1961,7 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen, { struct obd_device *obd; struct lustre_handle conn; -#ifdef ENABLE_ORPHANS struct llog_ctxt *ctxt; -#endif int rc = 0; ENTRY; @@ -1976,10 +1980,8 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen, CWARN("Received MDS connection ("LPX64")\n", conn.cookie); memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn)); -#ifdef ENABLE_ORPHANS ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT); rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); -#endif RETURN(rc); } diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index b63c4c2..686fd30 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -113,18 +113,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, struct obd_device *obd = ctxt->loc_obd; struct obd_export *exp = obd->obd_self_export; struct llog_cookie cookie; + struct llog_gen_rec *lgr; struct llog_unlink_rec *lur; struct obdo *oa; - struct obd_trans_info oti = { 0 }; obd_id oid; int rc = 0; ENTRY; - if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } - if (rec->lrh_type != MDS_UNLINK_REC) { + if (rec->lrh_type != MDS_UNLINK_REC && + rec->lrh_type != LLOG_GEN_REC) { CERROR("log record type error\n"); RETURN(-EINVAL); } @@ -132,9 +133,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, cookie.lgc_lgl = llh->lgh_id; cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT; cookie.lgc_index = le32_to_cpu(rec->lrh_index); - + + if (rec->lrh_type == LLOG_GEN_REC) { + lgr = (struct llog_gen_rec *)rec; + if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen)) + rc = 0; + else + rc = LLOG_PROC_BREAK; + CWARN("fetch generation log, send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(rc); + } + lur = (struct llog_unlink_rec *)rec; - oa = obdo_alloc(); if (oa == NULL) RETURN(-ENOMEM); @@ -144,10 +155,10 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie)); oid = oa->o_id; - rc = obd_destroy(exp, oa, NULL, &oti); + rc = obd_destroy(exp, oa, NULL, NULL); obdo_free(oa); if (rc == -ENOENT) { - CWARN("object already removed: send cookie\n"); + CWARN("object already removed, send cookie\n"); llog_cancel(ctxt, NULL, 1, &cookie, 0); RETURN(0); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index a83592f..67120d7 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1029,7 +1029,6 @@ static int ost_handle(struct ptlrpc_request *req) DEBUG_REQ(D_INODE, req, "ping"); rc = target_handle_ping(req); break; -#ifdef ENABLE_ORPHANS /* FIXME - just reply status */ case LLOG_ORIGIN_CONNECT: DEBUG_REQ(D_INODE, req, "log connect\n"); @@ -1039,7 +1038,6 @@ static int ost_handle(struct ptlrpc_request *req) if (rc) RETURN(rc); RETURN(ptlrpc_reply(req)); - //break; case OBD_LOG_CANCEL: CDEBUG(D_INODE, "log cancel\n"); OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0); @@ -1049,8 +1047,6 @@ static int ost_handle(struct ptlrpc_request *req) if (rc) RETURN(rc); RETURN(ptlrpc_reply(req)); - //break; -#endif case LDLM_ENQUEUE: CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); diff --git a/lustre/portals/include/config.h.in b/lustre/portals/include/config.h.in index f9605ab1..f295154 100644 --- a/lustre/portals/include/config.h.in +++ b/lustre/portals/include/config.h.in @@ -1,8 +1,5 @@ /* portals/include/config.h.in. Generated from configure.in by autoheader. */ -/* Compile with orphan support */ -#undef ENABLE_ORPHANS - /* Use the Pinger */ #undef ENABLE_PINGER diff --git a/lustre/ptlrpc/llog_client.c b/lustre/ptlrpc/llog_client.c index 5524843..8accba6 100644 --- a/lustre/ptlrpc/llog_client.c +++ b/lustre/ptlrpc/llog_client.c @@ -194,6 +194,7 @@ static int llog_client_read_header(struct llog_handle *handle) GOTO(out, rc =-EFAULT); } memcpy(handle->lgh_hdr, hdr, sizeof (*hdr)); + handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index); out: if (req) diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index 1dd2f9a..0694bd1 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -45,9 +45,9 @@ #ifdef __KERNEL__ int llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, - struct llog_ctxt_gen *gen) + struct llog_logid *logid, struct llog_gen *gen) { + struct llog_gen_rec *lgr; struct obd_import *imp; struct ptlrpc_request *request; struct llogd_conn_body *req_body; @@ -55,11 +55,31 @@ int llog_origin_connect(struct llog_ctxt *ctxt, int count, int rc; ENTRY; + if (list_empty(&ctxt->loc_handle->u.chd.chd_head)) { + CDEBUG(D_HA, "there is no record related to ctxt %p", ctxt); + RETURN(0); + } + + /* FIXME what value for gen->conn_cnt */ + LLOG_GEN_INC(ctxt->loc_gen); + + /* first add llog_gen_rec */ + OBD_ALLOC(lgr, sizeof(*lgr)); + if (!lgr) + RETURN(-ENOMEM); + lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr); + lgr->lgr_hdr.lrh_type = LLOG_GEN_REC; + lgr->lgr_gen = ctxt->loc_gen; + rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1); + OBD_FREE(lgr, sizeof(*lgr)); + if (rc != 1) + RETURN(rc); + LASSERT(ctxt->loc_imp); imp = ctxt->loc_imp; request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL); - if (!request) + if (!request) RETURN(-ENOMEM); req_body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*req_body)); @@ -87,9 +107,9 @@ int llog_handle_connect(struct ptlrpc_request *req) req_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_body)); ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx); - rc = llog_connect(ctxt, 1, &req_body->lgdc_logid, + rc = llog_connect(ctxt, 1, &req_body->lgdc_logid, &req_body->lgdc_gen); - if (rc != 0) + if (rc != 0) CERROR("failed at llog_relp_connect\n"); RETURN(rc); diff --git a/lustre/ptlrpc/llog_server.c b/lustre/ptlrpc/llog_server.c index b0b739e..4d9e68c 100644 --- a/lustre/ptlrpc/llog_server.c +++ b/lustre/ptlrpc/llog_server.c @@ -34,6 +34,7 @@ #include #include #include +#include int llog_origin_handle_create(struct ptlrpc_request *req) { @@ -238,16 +239,17 @@ int llog_origin_handle_close(struct ptlrpc_request *req) RETURN(rc); } -#ifdef ENABLE_ORPHANS int llog_origin_handle_cancel(struct ptlrpc_request *req) { struct obd_device *obd = req->rq_export->exp_obd; struct obd_device *disk_obd; struct llog_cookie *logcookies; struct llog_ctxt *ctxt; - int num_cookies, rc = 0; + int num_cookies, rc = 0, err, i; struct obd_run_ctxt saved; struct llog_handle *cathandle; + struct inode *inode; + void *handle; ENTRY; logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies)); @@ -262,27 +264,43 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req) CWARN("llog subsys not setup or already cleanup\n"); RETURN(-ENOENT); } - down(&ctxt->loc_sem); - disk_obd = ctxt->loc_exp->exp_obd; - cathandle = ctxt->loc_handle; - LASSERT(cathandle); + disk_obd = ctxt->loc_exp->exp_obd; push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); - rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies); + for (i = 0; i < num_cookies; i++, logcookies++) { + cathandle = ctxt->loc_handle; + LASSERT(cathandle != NULL); + inode = cathandle->lgh_file->f_dentry->d_inode; + + handle = fsfilt_start(disk_obd, inode, + FSFILT_OP_CANCEL_UNLINK_LOG, NULL); + if (IS_ERR(handle)) { + CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle)); + GOTO(pop_ctxt, rc = PTR_ERR(handle)); + } + + rc = llog_cat_cancel_records(cathandle, 1, logcookies); + + err = fsfilt_commit(disk_obd, inode, handle, 0); + if (err) { + CERROR("error committing transaction: %d\n", err); + if (!rc) + rc = err; + GOTO(pop_ctxt, rc); + } + } +pop_ctxt: + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); if (rc) CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc); else - CWARN("cancel %d llog-records\n", num_cookies); - - pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); - up(&ctxt->loc_sem); + CDEBUG(D_HA, "cancel %d llog-records\n", num_cookies); RETURN(rc); } EXPORT_SYMBOL(llog_origin_handle_cancel); -#endif -static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len, +static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len, char *client) { struct mds_obd *mds = &obd->u.mds; @@ -292,17 +310,17 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len, char name[4][64]; int rc, i, l, remains = buf_len; char *out = buf; - + if (ctxt == NULL || mds == NULL) RETURN(-EOPNOTSUPP); push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); - + sprintf(name[0], "%s", mds->mds_profile); sprintf(name[1], "%s-clean", mds->mds_profile); sprintf(name[2], "%s", client); sprintf(name[3], "%s-clean", client); - + for (i = 0; i < 4; i++) { int index, uncanceled = 0; rc = llog_create(ctxt, &handle, NULL, name[i]); @@ -313,18 +331,17 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len, llog_close(handle); GOTO(out_pop, rc = -ENOENT); } - + for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index ++) { if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap)) uncanceled++; } - + l = snprintf(out, remains, "[Log Name]: %s\nLog Size: %llu\n" "Last Index: %d\nUncanceled Records: %d\n\n", - name[i], + name[i], handle->lgh_file->f_dentry->d_inode->i_size, - handle->lgh_last_idx, - uncanceled); + handle->lgh_last_idx, uncanceled); out += l; remains -= l; @@ -344,7 +361,7 @@ struct cb_data { int init; }; -static int llog_catinfo_cb(struct llog_handle *cat, +static int llog_catinfo_cb(struct llog_handle *cat, struct llog_rec_hdr *rec, void *data) { static char *out = NULL; @@ -365,7 +382,7 @@ static int llog_catinfo_cb(struct llog_handle *cat, if (!(cat->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))) RETURN(-EINVAL); - + lir = (struct llog_logid_rec *)rec; logid = &lir->lid_id; rc = llog_create(ctxt, &handle, logid, NULL); @@ -374,7 +391,7 @@ static int llog_catinfo_cb(struct llog_handle *cat, rc = llog_init_handle(handle, 0, NULL); if (rc) GOTO(out_close, rc); - + for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index++) { if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap)) count++; @@ -394,13 +411,13 @@ static int llog_catinfo_cb(struct llog_handle *cat, CWARN("Not enough memory\n"); rc = -ENOMEM; } - + out_close: llog_close(handle); RETURN(rc); } - -static int llog_catinfo_deletions(struct obd_device *obd, char *buf, + +static int llog_catinfo_deletions(struct obd_device *obd, char *buf, int buf_len) { struct mds_obd *mds = &obd->u.mds; @@ -412,24 +429,24 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf, int rc; struct cb_data data; struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - + if (ctxt == NULL || mds == NULL) RETURN(-EOPNOTSUPP); - + count = mds->mds_lov_desc.ld_tgt_count; size = sizeof(*idarray) * count; - + OBD_ALLOC(idarray, size); if (!idarray) RETURN(-ENOMEM); memset(idarray, 0, size); - + rc = llog_get_cat_list(obd, obd, name, count, idarray); - if (rc) + if (rc) GOTO(out_free, rc); push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); - + id = idarray; data.ctxt = ctxt; data.out = buf; @@ -448,19 +465,19 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf, if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap)) uncanceled++; } - l = snprintf(data.out, data.remains, + l = snprintf(data.out, data.remains, "\n[Catlog ID]: #"LPX64"#"LPX64"#%08x " "[Log Count]: %d\n", id->lgl_oid, id->lgl_ogr, id->lgl_ogen, uncanceled); - + data.out += l; data.remains -= l; data.init = 1; - - llog_process(handle, llog_catinfo_cb, &data); + + llog_process(handle, llog_catinfo_cb, &data, NULL); llog_close(handle); - + if (data.remains <= 0) break; } @@ -470,7 +487,7 @@ out_free: OBD_FREE(idarray, size); RETURN(rc); } - + int llog_catinfo(struct ptlrpc_request *req) { struct obd_export *exp = req->rq_export; @@ -503,7 +520,7 @@ int llog_catinfo(struct ptlrpc_request *req) if (strlen(buf) == 0) sprintf(buf, "%s", "No log informations\n"); memcpy(reply, buf, buf_len); - + out_free: OBD_FREE(buf, buf_len); return rc; diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 74aa72b..d28b0b6 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -50,6 +50,8 @@ #include #include "ptlrpc_internal.h" +#define LLCD_SIZE 4096 + #ifdef __KERNEL__ static struct llog_commit_master lustre_lcm; @@ -61,7 +63,7 @@ static int llcd_alloc(void) struct llog_canceld_ctxt *llcd; int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_ALLOC(llcd, PAGE_SIZE + offset); + OBD_ALLOC(llcd, LLCD_SIZE + offset); if (llcd == NULL) return -ENOMEM; @@ -107,7 +109,7 @@ static void llcd_put(struct llog_canceld_ctxt *llcd) int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - OBD_FREE(llcd, PAGE_SIZE + offset); + OBD_FREE(llcd, LLCD_SIZE + offset); } else { spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); @@ -128,8 +130,8 @@ void llcd_send(struct llog_canceld_ctxt *llcd) EXPORT_SYMBOL(llcd_send); /* deleted objects have a commit callback that cancels the MDS - * log record for the deletion. The commit callback calls this - * function + * log record for the deletion. The commit callback calls this + * function */ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, @@ -160,8 +162,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, cookies->lgc_lgl.lgl_ogen, cookies->lgc_index); GOTO(out, rc = -ENOMEM); } - llcd->llcd_import = ctxt->loc_imp; - llcd->llcd_gen = ctxt->loc_gen; + llcd->llcd_ctxt = ctxt; ctxt->loc_llcd = llcd; } @@ -170,9 +171,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, llcd->llcd_cookiebytes += sizeof(*cookies); send_now: - if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || + if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_HA, "send llcd: %p\n", llcd); + CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); ctxt->loc_llcd = NULL; llcd_send(llcd); } @@ -187,16 +188,17 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) int rc = 0; ENTRY; - LASSERT(ctxt->loc_llcd); - if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - CWARN("import will be destroyed, put llcd %p\n", - ctxt->loc_llcd); - llcd_put(ctxt->loc_llcd); - ctxt->loc_llcd = NULL; + down(&ctxt->loc_sem); + if (ctxt->loc_llcd != NULL) { + CWARN("import will be destroyed, put " + "llcd %p:%p\n", ctxt->loc_llcd, ctxt); + llcd_put(ctxt->loc_llcd); + ctxt->loc_llcd = NULL; + ctxt->loc_imp = NULL; + } up(&ctxt->loc_sem); } else { - up(&ctxt->loc_sem); rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } @@ -294,11 +296,11 @@ static int log_commit_thread(void *arg) llcd = list_entry(lcd->lcd_llcd_list.next, typeof(*llcd), llcd_list); LASSERT(llcd->llcd_lcm == lcm); - import = llcd->llcd_import; + import = llcd->llcd_ctxt->loc_imp; } list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_import) + if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } @@ -306,7 +308,7 @@ static int log_commit_thread(void *arg) list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_import) + if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } @@ -316,31 +318,29 @@ static int log_commit_thread(void *arg) /* We are the only one manipulating our local list - no lock */ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ char *bufs[1] = {(char *)llcd->llcd_cookies}; - struct obd_device *obd = import->imp_obd; - struct llog_ctxt *ctxt; list_del(&llcd->llcd_list); if (llcd->llcd_cookiebytes == 0) { - CDEBUG(D_HA, "just put empty llcd %p\n", llcd); + CDEBUG(D_HA, "put empty llcd %p:%p\n", + llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } - /* check whether the cookies are new. if new then send, otherwise - * just put llcd */ - ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1); - LASSERT(ctxt != NULL); - down(&ctxt->loc_sem); - if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) { - up(&ctxt->loc_sem); - CDEBUG(D_HA, "just put stale llcd %p\n", llcd); + + down(&llcd->llcd_ctxt->loc_sem); + if (llcd->llcd_ctxt->loc_imp == NULL) { + up(&llcd->llcd_ctxt->loc_sem); + CWARN("import will be destroyed, put " + "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } - up(&ctxt->loc_sem); request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1, &llcd->llcd_cookiebytes, bufs); + up(&llcd->llcd_ctxt->loc_sem); + if (request == NULL) { rc = -ENOMEM; CERROR("error preparing commit: rc %d\n", rc); @@ -354,8 +354,18 @@ static int log_commit_thread(void *arg) } request->rq_replen = lustre_msg_size(0, NULL); + down(&llcd->llcd_ctxt->loc_sem); + if (llcd->llcd_ctxt->loc_imp == NULL) { + up(&llcd->llcd_ctxt->loc_sem); + CWARN("import will be destroyed, put " + "llcd %p:%p\n", llcd, llcd->llcd_ctxt); + llcd_put(llcd); + ptlrpc_req_finished(request); + continue; + } rc = ptlrpc_queue_wait(request); ptlrpc_req_finished(request); + up(&llcd->llcd_ctxt->loc_sem); /* If the RPC failed, we put this and the remaining * messages onto the resend list for another time. */ @@ -364,7 +374,7 @@ static int log_commit_thread(void *arg) continue; } -#if 0 /* FIXME just put llcd, not send it again */ +#if 0 /* FIXME just put llcd, not put it on resend list */ spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); if (++llcd->llcd_tries < 5) { @@ -377,13 +387,15 @@ static int log_commit_thread(void *arg) } else { spin_unlock(&lcm->lcm_llcd_lock); #endif - CERROR("commit %p dropped %d cookies: rc %d\n", - llcd, (int)(llcd->llcd_cookiebytes / - sizeof(*llcd->llcd_cookies)), - rc); + CERROR("commit %p:%p drop %d cookies: rc %d\n", + llcd, llcd->llcd_ctxt, + (int)(llcd->llcd_cookiebytes / + sizeof(*llcd->llcd_cookies)), rc); llcd_put(llcd); -// } +#if 0 + } break; +#endif } if (rc == 0) { @@ -439,11 +451,12 @@ int llog_start_commit_thread(void) EXPORT_SYMBOL(llog_start_commit_thread); static struct llog_process_args { - struct semaphore llpa_sem; + struct semaphore llpa_sem; struct llog_ctxt *llpa_ctxt; void *llpa_cb; void *llpa_arg; } llpa; + int llog_init_commit_master(void) { INIT_LIST_HEAD(&lcm->lcm_thread_busy); @@ -475,7 +488,6 @@ int llog_cleanup_commit_master(int force) return 0; } - static int log_process_thread(void *args) { struct llog_process_args *data = args; @@ -486,17 +498,17 @@ static int log_process_thread(void *args) unsigned long flags; int rc; ENTRY; - + up(&data->llpa_sem); lock_kernel(); ptlrpc_daemonize(); /* thread never needs to do IO */ - + SIGNAL_MASK_LOCK(current, flags); sigfillset(¤t->blocked); RECALC_SIGPENDING; SIGNAL_MASK_UNLOCK(current, flags); unlock_kernel(); - + rc = llog_create(ctxt, &llh, &logid, NULL); if (rc) { CERROR("llog_create failed %d\n", rc); @@ -507,26 +519,27 @@ static int log_process_thread(void *args) CERROR("llog_init_handle failed %d\n", rc); GOTO(out, rc); } - + if (cb) { rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); - if (rc) + if (rc != LLOG_PROC_BREAK) CERROR("llog_cat_process failed %d\n", rc); - } else + } else { CWARN("no callback function for recovery\n"); + } - CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd); + CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n", + ctxt->loc_llcd, ctxt); llog_sync(ctxt, NULL); out: rc = llog_cat_put(llh); if (rc) CERROR("llog_cat_put failed %d\n", rc); - + RETURN(rc); } -static int llog_recovery_generic(struct llog_ctxt *ctxt, - void *handle, - void *arg) + +static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) { int rc; ENTRY; @@ -546,31 +559,33 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt, RETURN(rc); } + int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen) + struct llog_logid *logid, struct llog_gen *gen) { struct llog_canceld_ctxt *llcd; int rc; ENTRY; - + + /* send back llcd before recovery from llog */ + if (ctxt->loc_llcd != NULL) { + CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt); + llog_sync(ctxt, NULL); + } + down(&ctxt->loc_sem); ctxt->loc_gen = *gen; - llcd = ctxt->loc_llcd; - if (llcd) { - CDEBUG(D_HA, "put current llcd when new connection arrives\n"); - llcd_put(llcd); - } llcd = llcd_grab(); if (llcd == NULL) { CERROR("couldn't get an llcd\n"); + up(&ctxt->loc_sem); RETURN(-ENOMEM); } - llcd->llcd_import = ctxt->loc_imp; - llcd->llcd_gen = ctxt->loc_gen; + llcd->llcd_ctxt = ctxt; ctxt->loc_llcd = llcd; up(&ctxt->loc_sem); - rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); + rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); if (rc != 0) CERROR("error recovery process: %d\n", rc); diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 034ee8a..bed8b61 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -14,7 +14,7 @@ init_test_env $@ . ${CONFIG:=$LUSTRE/tests/cfg/local.sh} # Skip these tests -ALWAYS_EXCEPT="35" +ALWAYS_EXCEPT="" gen_config() { @@ -707,5 +707,41 @@ test_37() { } run_test 37 "abort recovery before client does replay (test mds_cleanup_orphans for directories)" +test_38() { + for i in `seq 1 800`; do + touch $DIR/$tfile-$i + done + for i in `seq 1 400`; do + rm $DIR/$tfile-$i + done + + replay_barrier mds + fail mds + for i in `seq 401 800`; do + rm $DIR/$tfile-$i + done + sleep 2 + $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true +} +run_test 38 "test recovery from unlink llog (test llog_gen_rec) " + +test_39() { + for i in `seq 1 800`; do + touch $DIR/$tfile-$i + done + + replay_barrier mds + for i in `seq 1 400`; do + rm $DIR/$tfile-$i + done + fail mds + for i in `seq 401 800`; do + rm $DIR/$tfile-$i + done + sleep 2 + $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true +} +run_test 39 "test recovery from unlink llog (test llog_gen_rec) " + equals_msg test complete, cleaning up $CLEANUP diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 7397fcb..18810ae 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -137,6 +137,12 @@ check_obdo(void) CHECK_VALUE(OBD_MD_FLOSCOPQ); CHECK_VALUE(OBD_MD_FLCOOKIE); CHECK_VALUE(OBD_MD_FLGROUP); + + CHECK_VALUE(OBD_FL_INLINEDATA); + CHECK_VALUE(OBD_FL_OBDMDEXISTS); + CHECK_VALUE(OBD_FL_DELORPHAN); + CHECK_VALUE(OBD_FL_NORPC); + CHECK_VALUE(OBD_FL_IDONLY); } void @@ -206,6 +212,7 @@ check_niobuf_remote(void) CHECK_VALUE(OBD_BRW_WRITE); CHECK_VALUE(OBD_BRW_CREATE); CHECK_VALUE(OBD_BRW_SYNC); + CHECK_VALUE(OBD_BRW_FROM_GRANT); } void @@ -269,6 +276,7 @@ check_mds_body(void) CHECK_VALUE(FMODE_EXEC); CHECK_VALUE(MDS_OPEN_CREAT); CHECK_VALUE(MDS_OPEN_EXCL); + CHECK_VALUE(MDS_OPEN_TRUNC); CHECK_VALUE(MDS_OPEN_APPEND); CHECK_VALUE(MDS_OPEN_SYNC); CHECK_VALUE(MDS_OPEN_DIRECTORY); @@ -508,6 +516,7 @@ check_llog_logid(void) CHECK_VALUE(MDS_UNLINK_REC); CHECK_VALUE(OBD_CFG_REC); CHECK_VALUE(PTL_CFG_REC); + CHECK_VALUE(LLOG_GEN_REC); CHECK_VALUE(LLOG_HDR_MAGIC); CHECK_VALUE(LLOG_LOGID_MAGIC); } @@ -532,6 +541,79 @@ check_llog_rec_tail(void) } void +check_llog_logid_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_logid_rec); + CHECK_MEMBER(llog_logid_rec, lid_hdr); + CHECK_MEMBER(llog_logid_rec, lid_id); + CHECK_MEMBER(llog_logid_rec, lid_tail); +} + +void +check_llog_create_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_create_rec); + CHECK_MEMBER(llog_create_rec, lcr_hdr); + CHECK_MEMBER(llog_create_rec, lcr_fid); + CHECK_MEMBER(llog_create_rec, lcr_oid); + CHECK_MEMBER(llog_create_rec, lcr_ogen); +} + +void +check_llog_orphan_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_orphan_rec); + CHECK_MEMBER(llog_orphan_rec, lor_hdr); + CHECK_MEMBER(llog_orphan_rec, lor_oid); + CHECK_MEMBER(llog_orphan_rec, lor_ogen); + CHECK_MEMBER(llog_orphan_rec, lor_tail); +} + +void +check_llog_unlink_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_unlink_rec); + CHECK_MEMBER(llog_unlink_rec, lur_hdr); + CHECK_MEMBER(llog_unlink_rec, lur_oid); + CHECK_MEMBER(llog_unlink_rec, lur_ogen); + CHECK_MEMBER(llog_unlink_rec, lur_tail); +} + +void +check_llog_size_change_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_size_change_rec); + CHECK_MEMBER(llog_size_change_rec, lsc_hdr); + CHECK_MEMBER(llog_size_change_rec, lsc_fid); + CHECK_MEMBER(llog_size_change_rec, lsc_io_epoch); + CHECK_MEMBER(llog_size_change_rec, lsc_tail); +} + +void +check_llog_gen(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_gen); + CHECK_MEMBER(llog_gen, mnt_cnt); + CHECK_MEMBER(llog_gen, conn_cnt); +} + +void +check_llog_gen_rec(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_gen_rec); + CHECK_MEMBER(llog_gen_rec, lgr_hdr); + CHECK_MEMBER(llog_gen_rec, lgr_gen); + CHECK_MEMBER(llog_gen_rec, lgr_tail); +} + +void check_llog_log_hdr(void) { BLANK_LINE(); @@ -577,15 +659,8 @@ check_llogd_body(void) CHECK_VALUE(LLOG_ORIGIN_HANDLE_READ_HEADER); CHECK_VALUE(LLOG_ORIGIN_HANDLE_WRITE_REC); CHECK_VALUE(LLOG_ORIGIN_HANDLE_CLOSE); -} - -void -check_llog_ctxt_gen(void) -{ - BLANK_LINE(); - CHECK_STRUCT(llog_ctxt_gen); - CHECK_MEMBER(llog_ctxt_gen, mnt_cnt); - CHECK_MEMBER(llog_ctxt_gen, conn_cnt); + CHECK_VALUE(LLOG_ORIGIN_CONNECT); + CHECK_VALUE(LLOG_CATINFO); } void @@ -640,10 +715,6 @@ main(int argc, char **argv) CHECK_VALUE(OST_SAN_WRITE); CHECK_VALUE(OST_SYNC); CHECK_VALUE(OST_LAST_OPC); - CHECK_VALUE(OST_FIRST_OPC); - - CHECK_VALUE(OBD_FL_INLINEDATA); - CHECK_VALUE(OBD_FL_OBDMDEXISTS); CHECK_DEFINE(OBD_OBJECT_EOF); @@ -658,8 +729,11 @@ main(int argc, char **argv) CHECK_VALUE(MDS_DISCONNECT); CHECK_VALUE(MDS_GETSTATUS); CHECK_VALUE(MDS_STATFS); + CHECK_VALUE(MDS_PIN); + CHECK_VALUE(MDS_UNPIN); + CHECK_VALUE(MDS_SYNC); + CHECK_VALUE(MDS_DONE_WRITING); CHECK_VALUE(MDS_LAST_OPC); - CHECK_VALUE(MDS_FIRST_OPC); CHECK_VALUE(REINT_SETATTR); CHECK_VALUE(REINT_CREATE); @@ -687,7 +761,13 @@ main(int argc, char **argv) CHECK_VALUE(LDLM_BL_CALLBACK); CHECK_VALUE(LDLM_CP_CALLBACK); CHECK_VALUE(LDLM_LAST_OPC); - CHECK_VALUE(LDLM_FIRST_OPC); + + CHECK_VALUE(LCK_EX); + CHECK_VALUE(LCK_PW); + CHECK_VALUE(LCK_PR); + CHECK_VALUE(LCK_CW); + CHECK_VALUE(LCK_CR); + CHECK_VALUE(LCK_NL); CHECK_VALUE(PTLBD_QUERY); CHECK_VALUE(PTLBD_READ); @@ -696,9 +776,14 @@ main(int argc, char **argv) CHECK_VALUE(PTLBD_CONNECT); CHECK_VALUE(PTLBD_DISCONNECT); CHECK_VALUE(PTLBD_LAST_OPC); - CHECK_VALUE(PTLBD_FIRST_OPC); + + CHECK_VALUE(MGMT_CONNECT); + CHECK_VALUE(MGMT_DISCONNECT); + CHECK_VALUE(MGMT_EXCEPTION); CHECK_VALUE(OBD_PING); + CHECK_VALUE(OBD_LOG_CANCEL); + CHECK_VALUE(OBD_LAST_OPC); COMMENT("Sizes and Offsets"); BLANK_LINE(); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 9a37aa43..3535aaa 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -49,9 +49,6 @@ void lustre_assert_wire_constants(void) LASSERT(OST_SAN_WRITE == 15); LASSERT(OST_SYNC == 16); LASSERT(OST_LAST_OPC == 18); - LASSERT(OST_FIRST_OPC == 0); - LASSERT(OBD_FL_INLINEDATA == 1); - LASSERT(OBD_FL_OBDMDEXISTS == 2); LASSERT(OBD_OBJECT_EOF == 0xffffffffffffffffULL); LASSERT(OST_REQ_HAS_OA1 == 1); LASSERT(MDS_GETATTR == 33); @@ -63,8 +60,11 @@ void lustre_assert_wire_constants(void) LASSERT(MDS_DISCONNECT == 39); LASSERT(MDS_GETSTATUS == 40); LASSERT(MDS_STATFS == 41); - LASSERT(MDS_LAST_OPC == 45); - LASSERT(MDS_FIRST_OPC == 33); + LASSERT(MDS_PIN == 42); + LASSERT(MDS_UNPIN == 43); + LASSERT(MDS_SYNC == 44); + LASSERT(MDS_DONE_WRITING == 45); + LASSERT(MDS_LAST_OPC == 46); LASSERT(REINT_SETATTR == 1); LASSERT(REINT_CREATE == 2); LASSERT(REINT_LINK == 3); @@ -87,7 +87,12 @@ void lustre_assert_wire_constants(void) LASSERT(LDLM_BL_CALLBACK == 104); LASSERT(LDLM_CP_CALLBACK == 105); LASSERT(LDLM_LAST_OPC == 106); - LASSERT(LDLM_FIRST_OPC == 101); + LASSERT(LCK_EX == 1); + LASSERT(LCK_PW == 2); + LASSERT(LCK_PR == 3); + LASSERT(LCK_CW == 4); + LASSERT(LCK_CR == 5); + LASSERT(LCK_NL == 6); LASSERT(PTLBD_QUERY == 200); LASSERT(PTLBD_READ == 201); LASSERT(PTLBD_WRITE == 202); @@ -95,8 +100,12 @@ void lustre_assert_wire_constants(void) LASSERT(PTLBD_CONNECT == 204); LASSERT(PTLBD_DISCONNECT == 205); LASSERT(PTLBD_LAST_OPC == 206); - LASSERT(PTLBD_FIRST_OPC == 200); + LASSERT(MGMT_CONNECT == 250); + LASSERT(MGMT_DISCONNECT == 251); + LASSERT(MGMT_EXCEPTION == 252); LASSERT(OBD_PING == 400); + LASSERT(OBD_LOG_CANCEL == 401); + LASSERT(OBD_LAST_OPC == 402); /* Sizes and Offsets */ @@ -197,6 +206,11 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_MD_FLOSCOPQ == 4194304); LASSERT(OBD_MD_FLCOOKIE == 8388608); LASSERT(OBD_MD_FLGROUP == 16777216); + LASSERT(OBD_FL_INLINEDATA == 1); + LASSERT(OBD_FL_OBDMDEXISTS == 2); + LASSERT(OBD_FL_DELORPHAN == 4); + LASSERT(OBD_FL_NORPC == 8); + LASSERT(OBD_FL_IDONLY == 16); /* Checks for struct lov_mds_md_v1 */ LASSERT((int)sizeof(struct lov_mds_md_v1) == 32); @@ -274,6 +288,7 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_BRW_WRITE == 2); LASSERT(OBD_BRW_CREATE == 4); LASSERT(OBD_BRW_SYNC == 8); + LASSERT(OBD_BRW_FROM_GRANT == 32); /* Checks for struct ost_body */ LASSERT((int)sizeof(struct ost_body) == 168); @@ -349,6 +364,7 @@ void lustre_assert_wire_constants(void) LASSERT(FMODE_EXEC == 4); LASSERT(MDS_OPEN_CREAT == 64); LASSERT(MDS_OPEN_EXCL == 128); + LASSERT(MDS_OPEN_TRUNC == 512); LASSERT(MDS_OPEN_APPEND == 1024); LASSERT(MDS_OPEN_SYNC == 4096); LASSERT(MDS_OPEN_DIRECTORY == 65536); @@ -617,6 +633,7 @@ void lustre_assert_wire_constants(void) LASSERT(MDS_UNLINK_REC == 274801668); LASSERT(OBD_CFG_REC == 274857984); LASSERT(PTL_CFG_REC == 274923520); + LASSERT(LLOG_GEN_REC == 274989056); LASSERT(LLOG_HDR_MAGIC == 275010873); LASSERT(LLOG_LOGID_MAGIC == 275010874); @@ -691,6 +708,8 @@ void lustre_assert_wire_constants(void) LASSERT(LLOG_ORIGIN_HANDLE_READ_HEADER == 503); LASSERT(LLOG_ORIGIN_HANDLE_WRITE_REC == 504); LASSERT(LLOG_ORIGIN_HANDLE_CLOSE == 505); + LASSERT(LLOG_ORIGIN_CONNECT == 506); + LASSERT(LLOG_CATINFO == 507); /* Checks for struct llogd_conn_body */ LASSERT((int)sizeof(struct llogd_conn_body) == 40); -- 1.8.3.1