/* portals/include/config.h.in. Generated from configure.in by autoheader. */
-/* Compile with orphan support */
-#undef ENABLE_ORPHANS
-
/* Use the Pinger */
#undef ENABLE_PINGER
tbd Cluster File Systems, Inc. <info@clusterfs.com>
* version 1.2.0
* bug fixes
+ - orphan handling to avoid losing space on client/server crashes
2004-01-27 Cluster File Systems, Inc. <info@clusterfs.com>
* version 1.0.3
- ENOMEM detection and retry on socknal sends (2230)
- use GFP_NOFS throughout Lustre, to combat ENOMEM (2230)
- move osc_rpcd into ptlrpc, for use in MDC and others (2329)
- - protect MDS inode fsdata with stronger locking (2313)
+ - protect MDS inode fsdata with stronger locking; fixes assertion (2313)
- better error messages when a client is rejected during recovery (1505)
- avoid cancelling locks which were never granted, after failure (2330)
- fix i_sem/journal inversion in mds_client_add (2333)
AC_DEFINE(ENABLE_PINGER, 1, Use the Pinger)
fi
-# very experimental orphan support
-AC_ARG_ENABLE(orphans, [ --enable-orphans very experimental orphan recovery support])
-if test x$enable_orphans = xyes ; then
- AC_DEFINE(ENABLE_ORPHANS, 1, Compile with orphan support)
-fi
-
AC_ARG_WITH(obd-buffer-size, [ --with-obd-buffer-size=[size] set lctl ioctl maximum bytes (default=8192)],OBD_BUFFER_SIZE=$with_obd_buffer_size,OBD_BUFFER_SIZE=8192)
AC_DEFINE_UNQUOTED(OBD_MAX_IOCTL_BUFFER, $OBD_BUFFER_SIZE, [IOCTL Buffer Size])
struct llog_canceld_ctxt {
struct list_head llcd_list; /* free or pending struct list */
- struct obd_import *llcd_import;
+ struct llog_ctxt *llcd_ctxt;
struct llog_commit_master *llcd_lcm;
int llcd_tries; /* number of tries to send */
- struct llog_ctxt_gen llcd_gen;
int llcd_cookiebytes;
struct llog_cookie llcd_cookies[0];
};
#define FSFILT_OP_LINK 9
#define FSFILT_OP_CREATE_LOG 10
#define FSFILT_OP_UNLINK_LOG 11
+#define FSFILT_OP_CANCEL_UNLINK_LOG 12
static inline void *fsfilt_start(struct obd_device *obd, struct inode *inode,
int op, struct obd_trans_info *oti)
return (struct llog_cookie *)(oa->o_inline +
sizeof(struct lustre_handle));
}
-/* don't forget obdo_fid which is way down at the bottom so it can
+/* don't forget obdo_fid which is way down at the bottom so it can
* come after the definition of llog_cookie */
struct obd_statfs {
#define DISP_OPEN_OPEN (1 << 5)
#define DISP_ENQ_COMPLETE (1<<6)
-
-struct ll_uctxt {
- __u32 gid1;
- __u32 gid2;
-};
-
struct ll_fid {
__u64 id;
__u32 generation;
/*
* Opcodes for management/monitoring node.
*/
-#define MGMT_CONNECT 250
-#define MGMT_DISCONNECT 251
-#define MGMT_EXCEPTION 252 /* node died, etc. */
+typedef enum {
+ MGMT_CONNECT = 250,
+ MGMT_DISCONNECT,
+ MGMT_EXCEPTION, /* node died, etc. */
+ MGMT_LAST_OPC
+} mgmt_cmd_t;
+#define MGMT_FIRST_OPC MGMT_CONNECT
/*
* Opcodes for multiple servers.
*/
-#define OBD_PING 400
-#define OBD_LOG_CANCEL 401
-#define OBD_LAST_OPC (OBD_LOG_CANCEL + 1)
+typedef enum {
+ OBD_PING = 400,
+ OBD_LOG_CANCEL,
+ OBD_LAST_OPC
+} obd_cmd_t;
#define OBD_FIRST_OPC OBD_PING
/* catalog of log objects */
MDS_UNLINK_REC = 0x10610000 | (MDS_REINT << 8) | REINT_UNLINK,
OBD_CFG_REC = 0x10620000,
PTL_CFG_REC = 0x10630000,
+ LLOG_GEN_REC = 0x10640000,
LLOG_HDR_MAGIC = 0x10645539,
LLOG_LOGID_MAGIC = 0x1064553a,
} llog_op_type;
struct llog_rec_tail lsc_tail;
} __attribute__((packed));
+struct llog_gen {
+ __u64 mnt_cnt;
+ __u64 conn_cnt;
+} __attribute__((packed));
+
+struct llog_gen_rec {
+ struct llog_rec_hdr lgr_hdr;
+ struct llog_gen lgr_gen;
+ struct llog_rec_tail lgr_tail;
+};
/* On-disk header structure of each log object, stored in little endian order */
#define LLOG_CHUNK_SIZE 4096
#define LLOG_HEADER_SIZE (96)
__u32 llh_size;
__u32 llh_flags;
__u32 llh_cat_idx;
+ /* for a catlog the first plain slot is next to it */
struct obd_uuid llh_tgtuuid;
__u32 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
__u32 llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)];
__u64 lgd_cur_offset;
} __attribute__((packed));
-struct llog_ctxt_gen {
- __u64 mnt_cnt;
- __u64 conn_cnt;
-};
-
struct llogd_conn_body {
- struct llog_ctxt_gen lgdc_gen;
+ struct llog_gen lgdc_gen;
struct llog_logid lgdc_logid;
__u32 lgdc_ctxt_idx;
} __attribute__((packed));
snprintf(logname, sizeof(logname), "LOGS/%s", name)
struct plain_handle_data {
- struct list_head phd_entry;
- struct llog_handle *phd_cat_handle;
- struct llog_cookie phd_cookie; /* cookie of this log in its cat */
- int phd_last_idx;
+ struct list_head phd_entry;
+ struct llog_handle *phd_cat_handle;
+ struct llog_cookie phd_cookie; /* cookie of this log in its cat */
+ int phd_last_idx;
};
struct cat_handle_data {
/* llog.c - general API */
typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
-int llog_init_handle(struct llog_handle *handle, int flags,
+int llog_init_handle(struct llog_handle *handle, int flags,
struct obd_uuid *uuid);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data);
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata);
extern struct llog_handle *llog_alloc_handle(void);
extern void llog_free_handle(struct llog_handle *handle);
extern int llog_close(struct llog_handle *cathandle);
void *lpd_data;
llog_cb_t lpd_cb;
};
+
+struct llog_process_cat_data {
+ int first_idx;
+ int last_idx;
+ /* to process catlog across zero record */
+};
+
int llog_cat_put(struct llog_handle *cathandle);
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
struct llog_cookie *reccookie, void *buf);
int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
/* llog_obd.c */
int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
- int count, struct llog_logid *logid, struct llog_operations *op);
+ int count, struct llog_logid *logid,struct llog_operations *op);
int llog_cleanup(struct llog_ctxt *);
int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_add(struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies);
+int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+ int numcookies);
int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
- int count, struct llog_cookie *cookies, int flags);
+ int count, struct llog_cookie *cookies, int flags);
-int llog_obd_origin_setup(struct obd_device *obd, int index,
- struct obd_device *disk_obd, int count,
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+ struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
int llog_obd_origin_add(struct llog_ctxt *ctxt,
/* llog_ioctl.c */
int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data);
-int llog_catlog_list(struct obd_device *obd, int count,
+int llog_catlog_list(struct obd_device *obd, int count,
struct obd_ioctl_data *data);
/* llog_net.c */
int llog_initiator_connect(struct llog_ctxt *ctxt);
int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen);
int llog_handle_connect(struct ptlrpc_request *req);
/* recov_thread.c */
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags);
int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_gen *gen);
struct llog_operations {
int (*lop_write_rec)(struct llog_handle *loghandle,
- struct llog_rec_hdr *rec,
- struct llog_cookie *logcookies,
- int numcookies,
- void *,
- int idx);
+ struct llog_rec_hdr *rec,
+ struct llog_cookie *logcookies, int numcookies,
+ void *, int idx);
int (*lop_destroy)(struct llog_handle *handle);
- int (*lop_next_block)(struct llog_handle *h,
- int *curr_idx,
- int next_idx,
- __u64 *offset,
- void *buf,
- int len);
+ int (*lop_next_block)(struct llog_handle *h, int *curr_idx,
+ int next_idx, __u64 *offset, void *buf, int len);
int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
struct llog_logid *logid, char *name);
int (*lop_close)(struct llog_handle *handle);
int (*lop_read_header)(struct llog_handle *handle);
- int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
- struct obd_device *disk_obd, int count,
+ int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
+ struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
int (*lop_cleanup)(struct llog_ctxt *ctxt);
- int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
- struct lov_stripe_md *lsm,
+ int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
int (*lop_connect)(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen);
/* XXX add 2 more: commit callbacks and llog recovery functions */
};
struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
- struct llog_ctxt_gen loc_gen;
+ struct llog_gen loc_gen;
struct obd_device *loc_obd; /* points back to the containing obd*/
struct obd_export *loc_exp;
- struct obd_import *loc_imp; /* to use in RPC's: can be backward
+ struct obd_import *loc_imp; /* to use in RPC's: can be backward
pointing import */
struct llog_operations *loc_logops;
struct llog_handle *loc_handle;
void *llog_proc_cb;
};
-static inline void log_gen_init(struct llog_ctxt *ctxt)
+static inline void llog_gen_init(struct llog_ctxt *ctxt)
{
struct obd_device *obd = ctxt->loc_exp->exp_obd;
if (!strcmp(obd->obd_type->typ_name, "mds"))
ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count;
- else if (!strstr(obd->obd_type->typ_name, "filter")) {
- ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
- }
+ else if (!strstr(obd->obd_type->typ_name, "filter"))
+ ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
else
- ctxt->loc_gen.mnt_cnt = 0;
+ ctxt->loc_gen.mnt_cnt = 0;
}
-static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
+static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
{
if (a.mnt_cnt < b.mnt_cnt)
return 1;
return(a.conn_cnt < b.conn_cnt ? 1 : 0);
}
+#define LLOG_GEN_INC(gen) ((gen).conn_cnt) ++
+#define LLOG_PROC_BREAK 0x0001
static inline int llog_obd2ops(struct llog_ctxt *ctxt,
struct llog_operations **lop)
{
if (ctxt == NULL)
return -ENOTCONN;
+
*lop = ctxt->loc_logops;
if (*lop == NULL)
return -EOPNOTSUPP;
+
return 0;
}
{
if (loghandle == NULL)
return -EINVAL;
+
return llog_obd2ops(loghandle->lgh_ctxt, lop);
}
{
if (index < 0 || index >= LLOG_MAX_CTXTS)
return NULL;
- else
- return obd->obd_llog_ctxt[index];
+
+ return obd->obd_llog_ctxt[index];
}
static inline int llog_write_rec(struct llog_handle *handle,
struct llog_operations *lop;
int rc, buflen;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
struct llog_operations *lop;
int rc;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
struct llog_operations *lop;
int rc;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
rc = lop->lop_cancel(exp, lsm, count, cookies, flags);
RETURN(rc);
}
-#endif
+#endif
static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
int next_idx, __u64 *cur_offset, void *buf,
RETURN(rc);
}
-static inline int llog_create(struct llog_ctxt *ctxt,
- struct llog_handle **res,
+static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
struct llog_logid *logid, char *name)
{
struct llog_operations *lop;
}
static inline int llog_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid,
- struct llog_ctxt_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen)
{
struct llog_operations *lop;
int rc;
struct lov_stripe_md *lsm;
};
+struct ll_uctxt {
+ __u32 gid1;
+ __u32 gid2;
+};
+
struct mdc_op_data {
struct ll_fid fid1;
struct ll_fid fid2;
class_disconnect_exports(obd, 0);
- /* when recovery was abort, cleanup orphans for mds */
+ /* when recovery was aborted, cleanup orphans on mds and ost */
if (OBT(obd) && OBP(obd, postrecov)) {
rc = OBP(obd, postrecov)(obd);
if (rc >= 0)
obd->obd_name);
obd->obd_recovering = 0;
- /* when recovering finished, cleanup orphans for mds */
+ /* when recovery finished, cleanup orphans on mds and ost */
if (OBT(obd) && OBP(obd, postrecov)) {
rc2 = OBP(obd, postrecov)(obd);
if (rc2 >= 0)
- CWARN("%s: all clients recovered, %d MDS orphans "
- "deleted\n", obd->obd_name, rc2);
+ CWARN("%s: all clients recovered, %d MDS "
+ "orphans deleted\n", obd->obd_name, rc2);
else
CERROR("postrecov failed %d\n", rc2);
}
LBUG();
return NULL;
}
-
LASSERT(req->rq_export != NULL);
LASSERT(req->rq_export->exp_obd != NULL);
-#ifdef ENABLE_ORPHANS
/* FIXME - how to send reply */
if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
int rc = llog_origin_handle_cancel(req);
ldlm_callback_reply(req, rc);
RETURN(0);
}
-
if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
int rc = llog_origin_handle_create(req);
req->rq_status = rc;
ptlrpc_reply(req);
RETURN(0);
}
-
if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
int rc = llog_origin_handle_next_block(req);
req->rq_status = rc;
ptlrpc_reply(req);
RETURN(0);
}
-
if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
int rc = llog_origin_handle_read_header(req);
req->rq_status = rc;
ptlrpc_reply(req);
RETURN(0);
}
-
if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
int rc = llog_origin_handle_close(req);
ldlm_callback_reply(req, rc);
RETURN(0);
}
-#endif
+
ns = req->rq_export->exp_obd->obd_namespace;
LASSERT(ns != NULL);
#include "lov_internal.h"
-#if 0
-static int lov_logop_cleanup(struct llog_ctxt *ctxt)
-{
- struct lov_obd *lov = &ctxt->loc_obd->u.lov;
- int i, rc = 0;
-
- ENTRY;
- for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
- struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
- rc = llog_cleanup(cctxt);
- if (rc) {
- CERROR("error lov_llog_open %d\n", i);
- break;
- }
- }
- RETURN(rc);
-}
-#endif
-
/* Add log records for each OSC that this object is striped over, and return
* cookies for each one. We _would_ have nice abstraction here, except that
* we need to keep cookies in stripe order, even if some are NULL, so that
lur->lur_oid = loi->loi_id;
lur->lur_ogen = loi->loi_gr;
- rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, numcookies - rc);
+ rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
+ numcookies - rc);
}
OBD_FREE(lur, sizeof(*lur));
}
static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid,
- struct llog_ctxt_gen *gen)
+ struct llog_logid *logid,
+ struct llog_gen *gen)
{
struct obd_device *obd = ctxt->loc_obd;
struct lov_obd *lov = &obd->u.lov;
{
/* For updates to the last recieved file */
int nblocks = EXT3_DATA_TRANS_BLOCKS;
+ int blocksize, block_count = 0;
void *handle;
if (current->journal_info) {
/* Setattr on inode */
nblocks += 1;
break;
+ case FSFILT_OP_CANCEL_UNLINK_LOG:
+ blocksize = 1 << inode->i_blkbits;
+ block_count = (blocksize - 1) + LLOG_CHUNK_SIZE;
+ block_count = (block_count + blocksize - 1) >> inode->i_blkbits;
+ block_count = block_count * EXT3_DATA_TRANS_BLOCKS + 2;
+ nblocks = 2 * 2 * block_count;
+ break;
default: CERROR("unknown transaction start op %d\n", op);
LBUG();
}
int mds_handle(struct ptlrpc_request *req)
{
- int should_process;
+ int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
int rc = 0;
struct mds_obd *mds = NULL; /* quell gcc overwarning */
struct obd_device *obd = NULL;
break;
rc = mds_reint(req, 0, NULL);
- OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
+ fail = OBD_FAIL_MDS_REINT_NET_REP;
break;
}
rc = req->rq_status = -ENOTCONN;
}
- target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
+ target_send_reply(req, rc, fail);
return 0;
}
LASSERT(!obd->obd_recovering);
-#ifdef ENABLE_ORPHANS
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
if (rc != 0) {
CERROR("faild at llog_origin_connect: %d\n", rc);
}
-#endif
+
rc = mds_cleanup_orphans(obd);
rc2 = mds_lov_set_nextid(obd);
static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
- struct llog_ctxt_gen *gen)
+ struct llog_gen *gen)
{
struct obd_device *obd = ctxt->loc_obd;
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
{
struct mds_obd *mds = &obd->u.mds;
struct lov_stripe_md *lsm = NULL;
-#ifdef ENABLE_ORPHANS
struct llog_ctxt *ctxt;
-#endif
int rc;
ENTRY;
if (rc < 0)
RETURN(rc);
-#ifdef ENABLE_ORPHANS
ctxt = llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT);
rc = llog_add(ctxt, NULL, lsm, lustre_msg_buf(repmsg, offset + 1, 0),
repmsg->buflens[offset + 1] / sizeof(struct llog_cookie));
-#endif
obd_free_memmd(mds->mds_osc_exp, &lsm);
}
valsize = sizeof(mds->mds_lov_desc);
- rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc",
+ rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc",
&valsize, &mds->mds_lov_desc);
- if (rc)
+ if (rc)
GOTO(err_reg, rc);
mds->mds_max_mdsize = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
if (rc) {
CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
GOTO(err_reg, rc);
- }
+ }
-#ifdef ENABLE_ORPHANS
rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
if (rc) {
CERROR("failed to initialize catalog %d\n", rc);
GOTO(err_reg, rc);
}
-#endif
- /* FIXME before this set info call is made, we must initialize the logging */
+
+ /* FIXME before set info call is made, we must initialize logging */
rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
0, NULL);
- if (rc)
+ if (rc)
GOTO(err_reg, rc);
-
+
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
if (!mds->mds_lov_objids_valid) {
* it can use the obd_recovering flag to determine when the
* the OBD is full available. */
if (!obd->obd_recovering) {
-#ifdef ENABLE_ORPHANS
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
- obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
- if (rc != 0) {
+ obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
+ NULL);
+ if (rc != 0)
CERROR("faild at llog_origin_connect: %d\n", rc);
- }
-#endif
+
rc = mds_cleanup_orphans(obd);
if (rc > 0)
CERROR("Cleanup %d orphans while MDS isn't recovering\n", rc);
RETURN(rc);
err_llog:
-#ifdef ENABLE_ORPHANS
/* cleanup all llogging subsystems */
rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
- if (rc)
+ if (rc)
CERROR("failed to cleanup llogging subsystems\n");
-#endif
err_reg:
obd_register_observer(mds->mds_osc_obd, NULL);
err_discon:
ENTRY;
if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
-#ifdef ENABLE_ORPHANS
/* cleanup all llogging subsystems */
rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
- if (rc)
+ if (rc)
CERROR("failed to cleanup llogging subsystems\n");
-#endif
obd_register_observer(mds->mds_osc_obd, NULL);
RETURN(-EBUSY);
push_ctxt(&saved, &obd->obd_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
+ rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
&mds->mds_cfg_llh, NULL, name);
if (rc == 0)
- llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
+ llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
&cfg_uuid);
else
mds->mds_cfg_llh = NULL;
}
case OBD_IOC_PARSE: {
- struct llog_ctxt *ctxt =
+ struct llog_ctxt *ctxt =
llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_ctxt, NULL);
rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
}
case OBD_IOC_DUMP_LOG: {
- struct llog_ctxt *ctxt =
+ struct llog_ctxt *ctxt =
llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_ctxt, NULL);
rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
RETURN(rc);
}
- case OBD_IOC_LLOG_CHECK:
+ case OBD_IOC_LLOG_CHECK:
case OBD_IOC_LLOG_CANCEL:
- case OBD_IOC_LLOG_REMOVE: {
- struct llog_ctxt *ctxt =
+ case OBD_IOC_LLOG_REMOVE: {
+ struct llog_ctxt *ctxt =
llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-#ifdef ENABLE_ORPHANS
obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-#endif
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-
-#ifdef ENABLE_ORPHANS
llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-#endif
+
RETURN(rc);
- }
+ }
case OBD_IOC_LLOG_INFO:
case OBD_IOC_LLOG_PRINT: {
- struct llog_ctxt *ctxt =
+ struct llog_ctxt *ctxt =
llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-
+
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-
+
RETURN(rc);
}
GOTO(cleanup, rc);
}
-#ifdef ENABLE_ORPHANS
if (req != NULL &&
(reply_body->valid & OBD_MD_FLEASIZE) &&
mds_log_op_unlink(obd, pending_child->d_inode,
req->rq_repmsg, 1) > 0) {
reply_body->valid |= OBD_MD_FLCOOKIE;
}
-#endif
+
pending_child->d_fsdata = (void *) &dp;
dp.p_inum = 0;
dp.p_ptr = req;
{
struct mds_logcancel_data *mlcd = cb_data;
struct lov_stripe_md *lsm = NULL;
-#ifdef ENABLE_ORPHANS
struct llog_ctxt *ctxt;
-#endif
int rc;
obd_transno_commit_cb(obd, transno, error);
(int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
rc);
} else {
-#ifdef ENABLE_ORPHANS
///* XXX 0 normally, SENDNOW for debug */);
ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
rc = llog_cancel(ctxt, lsm,
CERROR("error cancelling %d log cookies: rc %d\n",
(int)(mlcd->mlcd_cookielen /
sizeof(*mlcd->mlcd_cookies)), rc);
-#endif
}
OBD_FREE(mlcd, mlcd->mlcd_size);
cleanup_phase = 4; /* transaction */
rc = vfs_unlink(dparent->d_inode, dchild);
-#ifdef ENABLE_ORPHANS
-
if (!rc && log_unlink)
if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg,
offset + 1) > 0)
body->valid |= OBD_MD_FLCOOKIE;
-#endif
break;
}
case S_IFLNK:
oa->o_mode = body->mode & S_IFMT;
oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
-#ifdef ENABLE_ORPHANS
if (body->valid & OBD_MD_FLCOOKIE) {
oa->o_valid |= OBD_MD_FLCOOKIE;
oti.oti_logcookies =
oa->o_valid &= ~OBD_MD_FLCOOKIE;
body->valid &= ~OBD_MD_FLCOOKIE;
}
-#endif
rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
obdo_free(oa);
CERROR("error %d unlinking orphan %*s from PENDING directory\n",
rc, dchild->d_name.len, dchild->d_name.name);
-#ifdef ENABLE_ORPHANS
if ((body->valid & OBD_MD_FLEASIZE)) {
if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
body->valid |= OBD_MD_FLCOOKIE;
}
-#endif
+
if (handle) {
int err = fsfilt_commit(obd, pending_dir, handle, 0);
if (err) {
}
EXPORT_SYMBOL(llog_free_handle);
-/* returns negative on error; 0 if success; 1 if success & log destroyed */
+/* returns negative on error; 0 if success; 1 if success & log destroyed */
int llog_cancel_rec(struct llog_handle *loghandle, int index)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
(le32_to_cpu(llh->llh_count) == 1) &&
- (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
+ (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
rc = llog_destroy(loghandle);
if (rc)
CERROR("failure destroying log after last cancel: %d\n",
}
rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
- if (rc)
+ if (rc)
CERROR("failure re-writing header %d\n", rc);
LASSERT(rc == 0);
RETURN(rc);
GOTO(out, rc);
}
rc = 0;
-
+
handle->lgh_last_idx = 0; /* header is record with index 0 */
llh->llh_count = cpu_to_le32(1); /* for the header record */
llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC);
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = cpu_to_le32(LLOG_CHUNK_SIZE);
+ llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len =
+ cpu_to_le32(LLOG_CHUNK_SIZE);
llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME));
if (uuid)
memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
- llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh), llh_bitmap));
+ llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap));
ext2_set_bit(0, llh->llh_bitmap);
out:
INIT_LIST_HEAD(&handle->u.phd.phd_entry);
else
LBUG();
-
+
if (rc) {
OBD_FREE(llh, sizeof(*llh));
handle->lgh_hdr = NULL;
}
EXPORT_SYMBOL(llog_close);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data)
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
{
struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = catdata;
void *buf;
__u64 cur_offset = LLOG_CHUNK_SIZE;
- int rc = 0, index = 1;
+ int rc = 0, index = 1, last_index, idx;
int saved_index = 0;
ENTRY;
if (!buf)
RETURN(-ENOMEM);
+ if (cd != NULL)
+ index = cd->first_idx + 1;
+ if (cd != NULL && cd->last_idx)
+ last_index = cd->last_idx;
+ else
+ last_index = LLOG_BITMAP_BYTES * 8 - 1;
+
+
while (rc == 0) {
struct llog_rec_hdr *rec;
-
+
/* skip records not set in bitmap */
- while (index < (LLOG_BITMAP_BYTES * 8) &&
+ while (index <= last_index &&
!ext2_test_bit(index, llh->llh_bitmap))
++index;
- LASSERT(index <= LLOG_BITMAP_BYTES * 8);
- if (index == LLOG_BITMAP_BYTES * 8)
+ LASSERT(index <= last_index + 1);
+ if (index == last_index + 1)
break;
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, LLOG_CHUNK_SIZE);
- rc = llog_next_block(loghandle, &saved_index, index,
+ rc = llog_next_block(loghandle, &saved_index, index,
&cur_offset, buf, LLOG_CHUNK_SIZE);
if (rc)
GOTO(out, rc);
rec = buf;
- index = le32_to_cpu(rec->lrh_index);
+ idx = le32_to_cpu(rec->lrh_index);
+ if (idx < index)
+ CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
+ while (idx < index) {
+ rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ idx ++;
+ }
/* process records in buffer, starting where we found one */
while ((void *)rec < buf + LLOG_CHUNK_SIZE) {
/* if set, process the callback on this record */
if (ext2_test_bit(index, llh->llh_bitmap)) {
rc = cb(loghandle, rec, data);
- if (rc)
+ if (rc == LLOG_PROC_BREAK) {
+ CWARN("recovery from log: "LPX64":%x"
+ " stopped\n",
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_ogen);
+ GOTO(out, rc);
+ }
+ if (rc)
GOTO(out, rc);
}
/* next record, still in buffer? */
++index;
- if (index > LLOG_BITMAP_BYTES * 8 - 1)
+ if (index > last_index)
GOTO(out, rc = 0);
rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
}
struct llog_handle *loghandle;
struct llog_log_hdr *llh;
struct llog_logid_rec rec;
- int rc, index, bitmap_size, i;
+ int rc, index, bitmap_size;
ENTRY;
+ llh = cathandle->lgh_hdr;
+ bitmap_size = sizeof(llh->llh_bitmap) * 8;
+
+ index = (cathandle->lgh_last_idx + 1) % bitmap_size;
+
+ /* maximum number of available slots in catlog is bitmap_size - 2 */
+ if (llh->llh_cat_idx == cpu_to_le32(index)) {
+ CERROR("no free catalog slots for log...\n");
+ RETURN(ERR_PTR(-ENOSPC));
+ } else {
+ if (index == 0)
+ index = 1;
+ if (ext2_set_bit(index, llh->llh_bitmap)) {
+ CERROR("argh, index %u already set in log bitmap?\n",
+ index);
+ LBUG(); /* should never happen */
+ }
+ cathandle->lgh_last_idx = index;
+ llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+ llh->llh_tail.lrt_index = cpu_to_le32(index);
+ }
+
rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
if (rc)
RETURN(ERR_PTR(rc));
- rc = llog_init_handle(loghandle,
- LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
+ rc = llog_init_handle(loghandle,
+ LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
&cathandle->lgh_hdr->llh_tgtuuid);
if (rc)
GOTO(out_destroy, rc);
- /* Find first free entry */
- llh = cathandle->lgh_hdr;
- bitmap_size = sizeof(llh->llh_bitmap) * 8;
- for (i = 0, index = le32_to_cpu(llh->llh_count); i < bitmap_size;
- i++, index++) {
- index %= bitmap_size;
- if (ext2_set_bit(index, llh->llh_bitmap)) {
- /* XXX This should trigger log clean up or similar */
- CERROR("catalog index %d is still in use\n", index);
- } else {
- cathandle->lgh_last_idx = index;
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- break;
- }
- }
- if (i == bitmap_size) {
- CERROR("no free catalog slots for log...\n");
- GOTO(out_destroy, rc = -ENOSPC);
- }
CWARN("new recovery log "LPX64":%x for index %u of catalog "LPX64"\n",
loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index,
cathandle->lgh_id.lgl_oid);
rec.lid_tail.lrt_index = cpu_to_le32(index);
/* update the catalog: header and record */
- rc = llog_write_rec(cathandle, &rec.lid_hdr,
+ rc = llog_write_rec(cathandle, &rec.lid_hdr,
&loghandle->u.phd.phd_cookie, 1, NULL, index);
if (rc < 0) {
GOTO(out_destroy, rc);
}
EXPORT_SYMBOL(llog_cat_new_log);
-/* Assumes caller has already pushed us into the kernel context and is locking.
+/* Open an existent log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
* We return a lock on the handle to ensure nobody yanks it from us.
*/
int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
if (cathandle == NULL)
RETURN(-EBADF);
- list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+ list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
struct llog_logid *cgl = &loghandle->lgh_id;
if (cgl->lgl_oid == logid->lgl_oid) {
continue;
}
loghandle->u.phd.phd_cat_handle = cathandle;
- cathandle->u.chd.chd_current_log = loghandle;
GOTO(out, rc = 0);
}
}
} else {
rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
if (!rc) {
- list_add(&loghandle->u.phd.phd_entry,
+ list_add(&loghandle->u.phd.phd_entry,
&cathandle->u.chd.chd_head);
- cathandle->u.chd.chd_current_log = loghandle;
}
}
if (!rc) {
loghandle->u.phd.phd_cat_handle = cathandle;
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
- loghandle->u.phd.phd_cookie.lgc_index =
+ loghandle->u.phd.phd_cookie.lgc_index =
le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
}
int rc;
ENTRY;
- list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
+ list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
u.phd.phd_entry) {
int err = llog_close(loghandle);
if (err)
*
* NOTE: loghandle is write-locked upon successful return
*/
-static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
+static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
int create)
{
struct llog_handle *loghandle = NULL;
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
down_write(&loghandle->lgh_lock);
up_read(&cathandle->lgh_lock);
RETURN(loghandle);
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
down_write(&loghandle->lgh_lock);
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
* Assumes caller has already pushed us into the kernel context.
*/
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, void *buf)
+ struct llog_cookie *reccookie, void *buf)
{
struct llog_handle *loghandle;
int rc;
/* loghandle is already locked by llog_cat_current_log() for us */
rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
up_write(&loghandle->lgh_lock);
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_add_rec);
down_write(&loghandle->lgh_lock);
rc = llog_cancel_rec(loghandle, cookies->lgc_index);
up_write(&loghandle->lgh_lock);
-
+
if (rc == 1) { /* log has been destroyed */
index = loghandle->u.phd.phd_cookie.lgc_index;
if (cathandle->u.chd.chd_current_log == loghandle)
cathandle->u.chd.chd_current_log = NULL;
llog_free_handle(loghandle);
-
+
LASSERT(index);
+ llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(cathandle, index);
+ if (rc == 0)
+ CDEBUG(D_HA, "cancel plain log at index %u "
+ "of catalog "LPX64"\n",
+ index, cathandle->lgh_id.lgl_oid);
}
}
up_write(&cathandle->lgh_lock);
}
EXPORT_SYMBOL(llog_cat_cancel_records);
-int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data)
+int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
+ void *data)
{
struct llog_process_data *d = data;
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
- CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
if (rc) {
- CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+ CERROR("Cannot find handle for log "LPX64"\n",
+ lir->lid_id.lgl_oid);
RETURN(rc);
- }
+ }
- rc = llog_process(llh, d->lpd_cb, d->lpd_data);
+ rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
RETURN(rc);
}
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
{
struct llog_process_data d;
+ struct llog_process_cat_data cd;
+ struct llog_log_hdr *llh = cat_llh->lgh_hdr;
int rc;
ENTRY;
+
+ LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
d.lpd_data = data;
d.lpd_cb = cb;
- rc = llog_process(cat_llh, llog_cat_process_cb, &d);
+ if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+ CWARN("catlog "LPX64" crosses index zero\n",
+ cat_llh->lgh_id.lgl_oid);
+
+ cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+ cd.last_idx = 0;
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+ if (rc != 0)
+ RETURN(rc);
+
+ cd.first_idx = 0;
+ cd.last_idx = cat_llh->lgh_last_idx;
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+ } else {
+ rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
+ }
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_process);
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
+{
+ struct llog_log_hdr *llh = cathandle->lgh_hdr;
+ int i, bitmap_size, idx;
+ ENTRY;
+
+ bitmap_size = sizeof(llh->llh_bitmap) * 8;
+ if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
+ idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+ llh->llh_cat_idx = cpu_to_le32(idx);
+ if (idx == cathandle->lgh_last_idx)
+ goto out;
+ for (i = (index + 1) % bitmap_size;
+ i != cathandle->lgh_last_idx;
+ i = (i + 1) % bitmap_size) {
+ if (!ext2_test_bit(i, llh->llh_bitmap)) {
+ idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+ llh->llh_cat_idx = cpu_to_le32(idx);
+ } else if (i == 0) {
+ llh->llh_cat_idx = 0;
+ } else {
+ break;
+ }
+ }
+out:
+ CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
+ cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+ }
+
+ RETURN(0);
+}
#if 0
/* Assumes caller has already pushed us into the kernel context. */
if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-write_hdr:
+write_hdr:
rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
&offset);
if (rc != LLOG_CHUNK_SIZE) {
static int str2logid(struct llog_logid *logid, char *str, int len)
{
char *start, *end, *endp;
-
+
start = str;
if (*start != '#')
RETURN(-EINVAL);
-
+
start++;
if (start - str >= len - 1)
RETURN(-EINVAL);
RETURN(-EINVAL);
*end = '\0';
- logid->lgl_oid = simple_strtoull(start, &endp, 16);
+ logid->lgl_oid = simple_strtoull(start, &endp, 16);
if (endp != end)
RETURN(-EINVAL);
RETURN(0);
}
-static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
+static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
void *data)
{
struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data;
static char *out;
char *endp;
int cur_index, rc = 0;
-
+
cur_index = le32_to_cpu(rec->lrh_index);
-
+
if (ioc_data && (ioc_data->ioc_inllen1)) {
l = 0;
- remains = ioc_data->ioc_inllen4 +
+ remains = ioc_data->ioc_inllen4 +
size_round(ioc_data->ioc_inllen1) +
size_round(ioc_data->ioc_inllen2) +
size_round(ioc_data->ioc_inllen3);
}
if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- struct llog_handle *log_handle;
-
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
- l = snprintf(out, remains,
- "[index]: %05d [type]: %02x [len]: %04d failed\n",
+ struct llog_handle *log_handle;
+
+ if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+ l = snprintf(out, remains, "[index]: %05d [type]: "
+ "%02x [len]: %04d failed\n",
cur_index, le32_to_cpu(rec->lrh_type),
le32_to_cpu(rec->lrh_len));
}
if (handle->lgh_ctxt == NULL)
RETURN(-EOPNOTSUPP);
llog_cat_id2handle(handle, &log_handle, &lir->lid_id);
- rc = llog_process(log_handle, llog_check_cb, NULL);
+ rc = llog_process(log_handle, llog_check_cb, NULL, NULL);
llog_close(log_handle);
} else {
switch (le32_to_cpu(rec->lrh_type)) {
case OST_SZ_REC:
- case OST_RAID1_REC:
+ case OST_RAID1_REC:
case MDS_UNLINK_REC:
- case OBD_CFG_REC:
- case PTL_CFG_REC:
- case LLOG_HDR_MAGIC: {
- l = snprintf(out, remains,
- "[index]: %05d [type]: %02x [len]: %04d ok\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ case OBD_CFG_REC:
+ case PTL_CFG_REC:
+ case LLOG_HDR_MAGIC: {
+ l = snprintf(out, remains, "[index]: %05d [type]: "
+ "%02x [len]: %04d ok\n",
+ cur_index, le32_to_cpu(rec->lrh_type),
+ le32_to_cpu(rec->lrh_len));
out += l;
remains -= l;
if (remains <= 0) {
- CERROR("not enough space for print log records\n");
+ CERROR("no space to print log records\n");
RETURN(-LLOG_EEMPTY);
}
RETURN(0);
}
default: {
- l = snprintf(out, remains,
- "[index]: %05d [type]: %02x [len]: %04d failed\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ l = snprintf(out, remains, "[index]: %05d [type]: "
+ "%02x [len]: %04d failed\n",
+ cur_index, le32_to_cpu(rec->lrh_type),
+ le32_to_cpu(rec->lrh_len));
out += l;
remains -= l;
if (remains <= 0) {
- CERROR("not enough space for print log records\n");
+ CERROR("no space to print log records\n");
RETURN(-LLOG_EEMPTY);
}
RETURN(0);
- }
+ }
}
}
RETURN(rc);
}
-static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
+static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
void *data)
{
struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data;
static char *out;
char *endp;
int cur_index;
-
+
if (ioc_data->ioc_inllen1) {
l = 0;
- remains = ioc_data->ioc_inllen4 +
+ remains = ioc_data->ioc_inllen4 +
size_round(ioc_data->ioc_inllen1) +
size_round(ioc_data->ioc_inllen2) +
size_round(ioc_data->ioc_inllen3);
l = snprintf(out, remains,
"[index]: %05d [logid]: #%llx#%llx#%08x\n",
- cur_index, lir->lid_id.lgl_oid,
+ cur_index, lir->lid_id.lgl_oid,
lir->lid_id.lgl_ogr, lir->lid_id.lgl_ogen);
} else {
l = snprintf(out, remains,
- "[index]: %05d [type]: %02x [len]: %04d\n",
+ "[index]: %05d [type]: %02x [len]: %04d\n",
cur_index, le32_to_cpu(rec->lrh_type),
le32_to_cpu(rec->lrh_len));
}
{
struct llog_handle *log;
int rc, index = 0;
-
+
down_write(&cat->lgh_lock);
rc = llog_cat_id2handle(cat, &log, logid);
if (rc) {
logid->lgl_oid, logid->lgl_ogr, logid->lgl_ogen);
GOTO(out, rc = -ENOENT);
}
-
+
index = log->u.phd.phd_cookie.lgc_index;
LASSERT(index);
rc = llog_destroy(log);
CDEBUG(D_IOCTL, "cannot destroy log\n");
GOTO(out, rc);
}
+ llog_cat_set_first_idx(cat, index);
rc = llog_cancel_rec(cat, index);
out:
llog_free_handle(log);
RETURN(rc);
}
-static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
- void *data)
+
+static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
+ void *data)
{
struct llog_logid_rec *lir = (struct llog_logid_rec*)rec;
int rc;
-
+
if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC))
- return (-EINVAL);
+ return (-EINVAL);
rc = llog_remove_log(handle, &lir->lid_id);
-
+
RETURN(rc);
}
struct llog_logid logid;
int err = 0;
struct llog_handle *handle = NULL;
-
+
if (*data->ioc_inlbuf1 == '#') {
err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1);
if (err)
GOTO(out, err);
err = llog_create(ctxt, &handle, &logid, NULL);
if (err)
- GOTO(out, err);
+ GOTO(out, err);
} else if (*data->ioc_inlbuf1 == '$') {
char *name = data->ioc_inlbuf1 + 1;
err = llog_create(ctxt, &handle, NULL, name);
}
err = llog_init_handle(handle, 0, NULL);
- if (err)
+ if (err)
GOTO(out_close, err = -ENOENT);
-
+
switch (cmd) {
case OBD_IOC_LLOG_INFO: {
int l;
- int remains = data->ioc_inllen2 +
+ int remains = data->ioc_inllen2 +
size_round(data->ioc_inllen1);
char *out = data->ioc_bulk;
- l = snprintf(out, remains,
+ l = snprintf(out, remains,
"logid: #%llx#%llx#%08x\n"
"flags: %x (%s)\n"
"records count: %d\n"
handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr,
handle->lgh_id.lgl_ogen,
le32_to_cpu(handle->lgh_hdr->llh_flags),
- le32_to_cpu(handle->lgh_hdr->llh_flags) &
+ le32_to_cpu(handle->lgh_hdr->llh_flags) &
LLOG_F_IS_CAT ? "cat" : "plain",
le32_to_cpu(handle->lgh_hdr->llh_count),
handle->lgh_last_idx);
out += l;
remains -= l;
- if (remains <= 0)
+ if (remains <= 0)
CERROR("not enough space for log header info\n");
GOTO(out_close, err);
}
case OBD_IOC_LLOG_CHECK: {
LASSERT(data->ioc_inllen1);
- err = llog_process(handle, llog_check_cb, data);
+ err = llog_process(handle, llog_check_cb, data, NULL);
if (err == -LLOG_EEMPTY)
err = 0;
GOTO(out_close, err);
case OBD_IOC_LLOG_PRINT: {
LASSERT(data->ioc_inllen1);
- err = llog_process(handle, llog_print_cb, data);
+ err = llog_process(handle, llog_print_cb, data, NULL);
if (err == -LLOG_EEMPTY)
err = 0;
struct llog_cookie cookie;
struct llog_logid plain;
char *endp;
-
+
if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
GOTO(out_close, err = -EINVAL);
-
+
err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
if (err)
GOTO(out_close, err);
cookie.lgc_lgl = plain;
- cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3,
- &endp, 0);
+ cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3, &endp, 0);
if (*endp != '\0')
GOTO(out_close, err = -EINVAL);
}
case OBD_IOC_LLOG_REMOVE: {
struct llog_logid plain;
-
+
if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
GOTO(out_close, err = -EINVAL);
-
+
if (data->ioc_inlbuf2) {
/*remove indicate log from the catalog*/
- err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
+ err = str2logid(&plain, data->ioc_inlbuf2,
+ data->ioc_inllen2);
if (err)
GOTO(out_close, err);
err = llog_remove_log(handle, &plain);
} else {
/*remove all the log of the catalog*/
- llog_process(handle, llog_delete_cb, NULL);
+ llog_process(handle, llog_delete_cb, NULL, NULL);
}
GOTO(out_close, err);
}
}
-
+
out_close:
- if (handle->lgh_hdr &&
+ if (handle->lgh_hdr &&
handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))
llog_cat_put(handle);
else
}
EXPORT_SYMBOL(llog_ioctl);
-int llog_catlog_list(struct obd_device *obd, int count,
+int llog_catlog_list(struct obd_device *obd, int count,
struct obd_ioctl_data *data)
{
int size, i;
char name[32] = "CATLIST";
char *out;
int l, remains, rc = 0;
-
+
size = sizeof(*idarray) * count;
-
+
OBD_ALLOC(idarray, size);
if (!idarray)
RETURN(-ENOMEM);
memset(idarray, 0, size);
-
+
rc = llog_get_cat_list(obd, obd, name, count, idarray);
if (rc) {
OBD_FREE(idarray, size);
RETURN(rc);
}
-
+
out = data->ioc_bulk;
remains = data->ioc_inllen1;
id = idarray;
for (i = 0; i < count; i++) {
- l = snprintf(out, remains,
+ l = snprintf(out, remains,
"catalog log: #%llx#%llx#%08x\n",
id->lgl_oid, id->lgl_ogr, id->lgl_ogen);
id++;
}
file->f_pos += len - sizeof(rec) - sizeof(tail);
- rc = fsfilt_write_record(obd, file, &tail, sizeof(tail), &file->f_pos, 0);
+ rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
if (rc) {
CERROR("error writing padding record: rc %d\n", rc);
goto out;
file->f_pos = off;
if (!buf) {
- rc = fsfilt_write_record(obd, file, rec, buflen, &file->f_pos, 0);
+ rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
if (rc) {
CERROR("error writing log record: rc %d\n", rc);
goto out;
static int llog_lvfs_read_header(struct llog_handle *handle)
{
- struct llog_rec_tail tail;
struct obd_device *obd;
int rc;
ENTRY;
LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
-
+
obd = handle->lgh_ctxt->loc_exp->exp_obd;
if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
if (rc)
CERROR("error reading log header\n");
- rc = llog_lvfs_read_blob(obd, handle->lgh_file, &tail, sizeof(tail),
- handle->lgh_file->f_dentry->d_inode->i_size -
- sizeof(tail));
- if (rc)
- CERROR("error reading log tail\n");
-
- handle->lgh_last_idx = le32_to_cpu(tail.lrt_index);
+ handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
RETURN(rc);
if (rc)
RETURN(rc);
- if (idx != -1) {
+ if (idx != -1) {
loff_t saved_offset;
/* no header: only allowed to insert record 1 */
if (rc || idx == 0)
RETURN(rc);
- saved_offset = sizeof(*llh) + (idx-1) * le32_to_cpu(rec->lrh_len);
+ saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = idx;
rc = 1;
- }
+ }
RETURN(rc);
}
* big enough to hold reclen, so all we care about is padding here.
*/
left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
- if (buf)
- reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
+ if (buf)
+ reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
sizeof(struct llog_rec_tail);
/* NOTE: padding is a record, but no bit is set */
- if (left != 0 && left != reclen &&
+ if (left != 0 && left != reclen &&
left < (reclen + LLOG_MIN_REC_SIZE)) {
loghandle->lgh_last_idx++;
rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
LBUG(); /* should never happen */
}
llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+ llh->llh_tail.lrt_index = cpu_to_le32(index);
offset = 0;
rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
- else
+ else
reccookie->lgc_subsys = -1;
rc = 1;
}
+ if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC)
+ rc = 1;
+
RETURN(rc);
}
{
if (goal <= curr)
return;
- *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
+ *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
~(LLOG_CHUNK_SIZE - 1);
}
CERROR("Cant read llog block at log id "LPU64
"/%u offset "LPU64"\n",
loghandle->lgh_id.lgl_oid,
- loghandle->lgh_id.lgl_ogen,
+ loghandle->lgh_id.lgl_ogen,
*cur_offset);
RETURN(rc);
}
} else {
filp = l_filp_open(logname, flags, mode);
if (IS_ERR(filp))
- CERROR("logfile creation %s: %ld\n", logname,
+ CERROR("logfile creation %s: %ld\n", logname,
PTR_ERR(filp));
}
- OBD_FREE(logname, PATH_MAX);
+ OBD_FREE(logname, PATH_MAX);
return filp;
}
if (IS_ERR(dchild)) {
rc = PTR_ERR(dchild);
- CERROR("error looking up log file "LPX64":0x%x: rc %d\n",
+ CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
logid->lgl_oid, logid->lgl_ogen, rc);
GOTO(cleanup, rc);
}
LBUG();
return 0;
}
+
static int llog_lvfs_write_rec(struct llog_handle *loghandle,
struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, int cookiecount,
+ struct llog_cookie *reccookie, int cookiecount,
void *buf, int idx)
{
LBUG();
return 0;
}
+
static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
int next_idx, __u64 *cur_offset, void *buf,
int len)
LBUG();
return 0;
}
-static int llog_lvfs_create(struct llog_obd_ctxt *ctxt, struct llog_handle **res,
+
+static int llog_lvfs_create(struct llog_obd_ctxt *ctxt,struct llog_handle **res,
struct llog_logid *logid, char *name)
{
LBUG();
return 0;
}
+
static int llog_lvfs_close(struct llog_handle *handle)
{
LBUG();
return 0;
}
+
static int llog_lvfs_destroy(struct llog_handle *handle)
{
LBUG();
return 0;
}
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
char *name, int count, struct llog_logid *idarray)
{
LBUG();
return 0;
}
-int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
char *name, int count, struct llog_logid *idarray)
{
LBUG();
/* helper functions for calling the llog obd methods */
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
+int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
int count, struct llog_logid *logid, struct llog_operations *op)
{
int rc = 0;
if (op->lop_setup)
rc = op->lop_setup(obd, index, disk_obd, count, logid);
- if (ctxt && rc)
+ if (ctxt && rc)
OBD_FREE(ctxt, sizeof(*ctxt));
RETURN(rc);
int rc = 0;
ENTRY;
- down(&ctxt->loc_sem);
LASSERT(ctxt);
if (CTXTP(ctxt, cleanup))
ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
class_export_put(ctxt->loc_exp);
ctxt->loc_exp = NULL;
- up(&ctxt->loc_sem);
OBD_FREE(ctxt, sizeof(*ctxt));
RETURN(rc);
if (!ctxt)
RETURN(0);
- down(&ctxt->loc_sem);
- if (ctxt->loc_llcd && CTXTP(ctxt, sync))
+
+ if (CTXTP(ctxt, sync))
rc = CTXTP(ctxt, sync)(ctxt, exp);
- else
- up(&ctxt->loc_sem);
RETURN(rc);
}
EXPORT_SYMBOL(llog_sync);
-int llog_add(struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies)
+int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+ int numcookies)
{
int rc;
ENTRY;
LASSERT(ctxt);
- down(&ctxt->loc_sem);
CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
- up(&ctxt->loc_sem);
RETURN(rc);
}
EXPORT_SYMBOL(llog_add);
EXPORT_SYMBOL(llog_cancel);
/* callback func for llog_process in llog_obd_origin_setup */
-static int cat_cancel_cb(struct llog_handle *cathandle,
+static int cat_cancel_cb(struct llog_handle *cathandle,
struct llog_rec_hdr *rec, void *data)
{
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
- CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
if (rc) {
- CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+ CERROR("Cannot find handle for log "LPX64"\n",
+ lir->lid_id.lgl_oid);
RETURN(rc);
- }
-
+ }
+
llh = loghandle->lgh_hdr;
if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
(le32_to_cpu(llh->llh_count) == 1)) {
rc = llog_destroy(loghandle);
if (rc)
- CERROR("failure destroying log during postsetup: %d\n", rc);
+ CERROR("failure destroying log in postsetup: %d\n", rc);
LASSERT(rc == 0);
index = loghandle->u.phd.phd_cookie.lgc_index;
- if (cathandle->u.chd.chd_current_log == loghandle)
- cathandle->u.chd.chd_current_log = NULL;
llog_free_handle(loghandle);
-
+
LASSERT(index);
+ llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(cathandle, index);
if (rc == 0)
- CWARN("cancel log "LPX64":%x at index %u of catalog "LPX64"\n",
- lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
- le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+ CWARN("cancel log "LPX64":%x at index %u of catalog "
+ LPX64"\n", lir->lid_id.lgl_oid,
+ lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index),
+ cathandle->lgh_id.lgl_oid);
}
RETURN(rc);
/* lop_setup method for filter/osc */
// XXX how to set exports
-int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
- int count, struct llog_logid *logid)
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+ struct obd_device *disk_obd, int count,
+ struct llog_logid *logid)
{
struct llog_ctxt *ctxt;
struct llog_handle *handle;
RETURN(0);
LASSERT(count == 1);
-
+
ctxt = llog_get_context(obd, index);
LASSERT(ctxt);
- log_gen_init(ctxt);
+ llog_gen_init(ctxt);
- down(&ctxt->loc_sem);
if (logid->lgl_oid)
rc = llog_create(ctxt, &handle, logid, NULL);
else {
rc = llog_create(ctxt, &handle, NULL, NULL);
- if (!rc)
+ if (!rc)
*logid = handle->lgh_id;
}
- if (rc)
+ if (rc)
GOTO(out, rc);
ctxt->loc_handle = handle;
if (rc)
GOTO(out, rc);
- rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL);
- if (rc)
+ rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
+ if (rc)
CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
out:
- up(&ctxt->loc_sem);
if (ctxt && rc) {
obd->obd_llog_ctxt[index] = NULL;
OBD_FREE(ctxt, sizeof(*ctxt));
struct llog_log_hdr *llh;
int rc, index;
ENTRY;
-
+
if (!ctxt)
return 0;
cathandle = ctxt->loc_handle;
if (cathandle) {
- list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
- u.phd.phd_entry) {
+ list_for_each_entry_safe(loghandle, n,
+ &cathandle->u.chd.chd_head,
+ u.phd.phd_entry) {
llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
+ if ((le32_to_cpu(llh->llh_flags) &
+ LLOG_F_ZAP_WHEN_EMPTY) &&
(le32_to_cpu(llh->llh_count) == 1)) {
rc = llog_destroy(loghandle);
if (rc)
- CERROR("failure destroying log during cleanup: %d\n",
- rc);
+ CERROR("failure destroying log during "
+ "cleanup: %d\n", rc);
LASSERT(rc == 0);
index = loghandle->u.phd.phd_cookie.lgc_index;
- if (cathandle->u.chd.chd_current_log == loghandle)
- cathandle->u.chd.chd_current_log = NULL;
llog_free_handle(loghandle);
-
+
LASSERT(index);
+ llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(cathandle, index);
if (rc == 0)
- CWARN("cancel plain log at index %u of catalog "LPX64"\n",
+ CWARN("cancel plain log at index %u "
+ "of catalog "LPX64"\n",
index, cathandle->lgh_id.lgl_oid);
}
}
CERROR("rc: %d\n", rc);
GOTO(out, rc);
}
-
+
out:
OBD_FREE(idarray, size);
RETURN(rc);
int rc2;
ENTRY;
- CERROR("1a: create a log with name: %s\n", name);
+ CWARN("1a: create a log with name: %s\n", name);
LASSERT(ctxt);
rc = llog_create(ctxt, &llh, NULL, name);
GOTO(out, rc);
out:
- CERROR("1b: close newly-created log\n");
+ CWARN("1b: close newly-created log\n");
rc2 = llog_close(llh);
if (rc2) {
CERROR("1b: close log %s failed: %d\n", name, rc2);
struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
ENTRY;
- CERROR("2a: re-open a log with name: %s\n", name);
+ CWARN("2a: re-open a log with name: %s\n", name);
rc = llog_create(ctxt, llh, NULL, name);
if (rc) {
CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
if ((rc = verify_handle("2", *llh, 1)))
RETURN(rc);
- CERROR("2b: create a log without specified NAME & LOGID\n");
+ CWARN("2b: create a log without specified NAME & LOGID\n");
rc = llog_create(ctxt, &loghandle, NULL, NULL);
if (rc) {
CERROR("2b: create log failed\n");
logid = loghandle->lgh_id;
llog_close(loghandle);
- CERROR("2b: re-open the log by LOGID\n");
+ CWARN("2b: re-open the log by LOGID\n");
rc = llog_create(ctxt, &loghandle, &logid, NULL);
if (rc) {
CERROR("2b: re-open log by LOGID failed\n");
}
llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
- CERROR("2b: destroy this log\n");
+ CWARN("2b: destroy this log\n");
rc = llog_destroy(loghandle);
if (rc) {
CERROR("2b: destroy log failed\n");
lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr));
lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC);
- CERROR("3a: write one create_rec\n");
+ CWARN("3a: write one create_rec\n");
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
num_recs++;
if (rc) {
if ((rc = verify_handle("3a", llh, num_recs)))
RETURN(rc);
- CERROR("3b: write 10 cfg log records with 8 bytes bufs\n");
+ CWARN("3b: write 10 cfg log records with 8 bytes bufs\n");
for (i = 0; i < 10; i++) {
struct llog_rec_hdr hdr;
char buf[8];
if ((rc = verify_handle("3b", llh, num_recs)))
RETURN(rc);
- CERROR("3c: write 1000 more log records\n");
+ CWARN("3c: write 1000 more log records\n");
for (i = 0; i < 1000; i++) {
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
if (rc) {
lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
sprintf(name, "%x", llog_test_rand+1);
- CERROR("4a: create a catalog log with name: %s\n", name);
+ CWARN("4a: create a catalog log with name: %s\n", name);
rc = llog_create(ctxt, &cath, NULL, name);
if (rc) {
CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
num_recs++;
cat_logid = cath->lgh_id;
- CERROR("4b: write 1 record into the catalog\n");
+ CWARN("4b: write 1 record into the catalog\n");
rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL);
if (rc != 1) {
CERROR("4b: write 1 catalog record failed at: %d\n", rc);
if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
RETURN(rc);
- CERROR("4c: cancel 1 log record\n");
+ CWARN("4c: cancel 1 log record\n");
rc = llog_cat_cancel_records(cath, 1, &cookie);
if (rc) {
CERROR("4c: cancel 1 catalog based record failed: %d\n", rc);
if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
RETURN(rc);
- CERROR("4d: write 40,000 more log records\n");
+ CWARN("4d: write 40,000 more log records\n");
for (i = 0; i < 40000; i++) {
rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL);
if (rc) {
num_recs++;
}
- CERROR("4e: add 5 large records, one record per block\n");
+ CWARN("4e: add 5 large records, one record per block\n");
buflen = LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
- sizeof(struct llog_rec_tail);
OBD_ALLOC(buf, buflen);
OBD_FREE(buf, buflen);
out:
- CERROR("4f: put newly-created catalog\n");
+ CWARN("4f: put newly-created catalog\n");
rc = llog_cat_put(cath);
if (rc)
CERROR("1b: close log %s failed: %d\n", name, rc);
RETURN(-EINVAL);
}
- CERROR("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
+ CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid,
lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid);
RETURN(0);
static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
void *data)
{
- if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+ if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
- CERROR("seeing record at index %d in log "LPX64"\n",
+ CWARN("seeing record at index %d in log "LPX64"\n",
le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid);
RETURN(0);
}
struct llog_cookie cookie;
static int i = 0;
- if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+ if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
cpu_to_le32(LLOG_MIN_REC_SIZE);
lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
- CERROR("5a: re-open catalog by id\n");
+ CWARN("5a: re-open catalog by id\n");
rc = llog_create(ctxt, &llh, &cat_logid, NULL);
if (rc) {
CERROR("5a: llog_create with logid failed: %d\n", rc);
}
llog_init_handle(llh, LLOG_F_IS_CAT, &uuid);
- CERROR("5b: print the catalog entries.. we expect 2\n");
- rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5");
+ CWARN("5b: print the catalog entries.. we expect 2\n");
+ rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL);
if (rc) {
CERROR("5b: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
}
- CERROR("5c: Cancel 40000 records, see one log zapped\n");
+ CWARN("5c: Cancel 40000 records, see one log zapped\n");
rc = llog_cat_process(llh, llog_cancel_rec_cb, "foobar");
if (rc != -4711) {
CERROR("5c: process with cat_cancel_cb failed: %d\n", rc);
GOTO(out, rc);
}
- CERROR("5d: add 1 record to the log with many canceled empty pages\n");
+ CWARN("5d: add 1 record to the log with many canceled empty pages\n");
rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL);
if (rc) {
CERROR("5d: add record to the log with many canceled empty\
GOTO(out, rc);
}
- CERROR("5b: print the catalog entries.. we expect 1\n");
- rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5");
+ CWARN("5b: print the catalog entries.. we expect 1\n");
+ rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL);
if (rc) {
CERROR("5b: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
}
- CERROR("5e: print plain log entries.. expect 6\n");
+ CWARN("5e: print plain log entries.. expect 6\n");
rc = llog_cat_process(llh, plain_print_cb, "foobar");
if (rc) {
CERROR("5e: process with plain_print_cb failed: %d\n", rc);
}
out:
- CERROR("5: close re-opened catalog\n");
+ CWARN("5: close re-opened catalog\n");
if (llh)
rc = llog_cat_put(llh);
if (rc)
struct llog_ctxt *nctxt;
int rc;
- CERROR("6a: re-open log %s using client API\n", name);
+ CWARN("6a: re-open log %s using client API\n", name);
mdc_obd = class_find_client_obd(mds_uuid, LUSTRE_MDC_NAME, NULL);
if (mdc_obd == NULL) {
CERROR("6: no MDC devices connected to %s found.\n",
GOTO(parse_out, rc);
}
- rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL);
+ rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL, NULL);
if (rc)
CERROR("6: llog_process failed %d\n", rc);
if (rc)
GOTO(parse_out, rc);
- rc = llog_process(llh, class_config_llog_handler, cfg);
+ rc = llog_process(llh, class_config_llog_handler, cfg, NULL);
parse_out:
rc2 = llog_close(llh);
if (rc == 0)
if (rc)
GOTO(parse_out, rc);
- rc = llog_process(llh, class_config_dump_handler, cfg);
+ rc = llog_process(llh, class_config_dump_handler, cfg, NULL);
parse_out:
rc2 = llog_close(llh);
if (rc == 0)
ENTRY;
// XXX add a storage location for the logid for size changes
-#ifdef ENABLE_ORPHANS
rc = llog_cat_initialize(obd, 1);
if (rc)
CERROR("failed to setup llogging subsystems\n");
-#endif
RETURN(rc);
}
int rc = 0;
ENTRY;
-#ifdef ENABLE_ORPHANS
rc = obd_llog_finish(obd, 0);
if (rc)
CERROR("failed to cleanup llogging subsystem\n");
-#endif
RETURN(rc);
}
cleanup:
switch(cleanup_phase) {
case 3:
- if (fcc != NULL)
- fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
- filter_cancel_cookies_cb, fcc);
+ if (fcc != NULL) {
+ if (oti != NULL) {
+ fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+ filter_cancel_cookies_cb, fcc);
+ } else {
+ fsfilt_add_journal_cb(obd, 0, handle,
+ filter_cancel_cookies_cb, fcc);
+ }
+ }
rc = filter_finish_transno(exp, oti, rc);
rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
if (rc2) {
struct obd_run_ctxt saved;
struct filter_obd *filter;
struct dentry *dentry;
+ struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
/* an objid of zero is taken to mean "sync whole filesystem" */
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+ /* flush any remaining cancel messages out to the target */
+ ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ llog_sync(ctxt, exp);
RETURN(rc);
}
{
struct obd_device *obd;
struct lustre_handle conn;
-#ifdef ENABLE_ORPHANS
struct llog_ctxt *ctxt;
-#endif
int rc = 0;
ENTRY;
CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
-#ifdef ENABLE_ORPHANS
ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-#endif
RETURN(rc);
}
struct obd_device *obd = ctxt->loc_obd;
struct obd_export *exp = obd->obd_self_export;
struct llog_cookie cookie;
+ struct llog_gen_rec *lgr;
struct llog_unlink_rec *lur;
struct obdo *oa;
- struct obd_trans_info oti = { 0 };
obd_id oid;
int rc = 0;
ENTRY;
- if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+ if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
- if (rec->lrh_type != MDS_UNLINK_REC) {
+ if (rec->lrh_type != MDS_UNLINK_REC &&
+ rec->lrh_type != LLOG_GEN_REC) {
CERROR("log record type error\n");
RETURN(-EINVAL);
}
cookie.lgc_lgl = llh->lgh_id;
cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
cookie.lgc_index = le32_to_cpu(rec->lrh_index);
-
+
+ if (rec->lrh_type == LLOG_GEN_REC) {
+ lgr = (struct llog_gen_rec *)rec;
+ if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen))
+ rc = 0;
+ else
+ rc = LLOG_PROC_BREAK;
+ CWARN("fetch generation log, send cookie\n");
+ llog_cancel(ctxt, NULL, 1, &cookie, 0);
+ RETURN(rc);
+ }
+
lur = (struct llog_unlink_rec *)rec;
-
oa = obdo_alloc();
if (oa == NULL)
RETURN(-ENOMEM);
memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie));
oid = oa->o_id;
- rc = obd_destroy(exp, oa, NULL, &oti);
+ rc = obd_destroy(exp, oa, NULL, NULL);
obdo_free(oa);
if (rc == -ENOENT) {
- CWARN("object already removed: send cookie\n");
+ CWARN("object already removed, send cookie\n");
llog_cancel(ctxt, NULL, 1, &cookie, 0);
RETURN(0);
}
DEBUG_REQ(D_INODE, req, "ping");
rc = target_handle_ping(req);
break;
-#ifdef ENABLE_ORPHANS
/* FIXME - just reply status */
case LLOG_ORIGIN_CONNECT:
DEBUG_REQ(D_INODE, req, "log connect\n");
if (rc)
RETURN(rc);
RETURN(ptlrpc_reply(req));
- //break;
case OBD_LOG_CANCEL:
CDEBUG(D_INODE, "log cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
if (rc)
RETURN(rc);
RETURN(ptlrpc_reply(req));
- //break;
-#endif
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
/* portals/include/config.h.in. Generated from configure.in by autoheader. */
-/* Compile with orphan support */
-#undef ENABLE_ORPHANS
-
/* Use the Pinger */
#undef ENABLE_PINGER
GOTO(out, rc =-EFAULT);
}
memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
+ handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
out:
if (req)
#ifdef __KERNEL__
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid,
- struct llog_ctxt_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen)
{
+ struct llog_gen_rec *lgr;
struct obd_import *imp;
struct ptlrpc_request *request;
struct llogd_conn_body *req_body;
int rc;
ENTRY;
+ if (list_empty(&ctxt->loc_handle->u.chd.chd_head)) {
+ CDEBUG(D_HA, "there is no record related to ctxt %p", ctxt);
+ RETURN(0);
+ }
+
+ /* FIXME what value for gen->conn_cnt */
+ LLOG_GEN_INC(ctxt->loc_gen);
+
+ /* first add llog_gen_rec */
+ OBD_ALLOC(lgr, sizeof(*lgr));
+ if (!lgr)
+ RETURN(-ENOMEM);
+ lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
+ lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
+ lgr->lgr_gen = ctxt->loc_gen;
+ rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
+ OBD_FREE(lgr, sizeof(*lgr));
+ if (rc != 1)
+ RETURN(rc);
+
LASSERT(ctxt->loc_imp);
imp = ctxt->loc_imp;
request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL);
- if (!request)
+ if (!request)
RETURN(-ENOMEM);
req_body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*req_body));
req_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_body));
ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
- rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
+ rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
&req_body->lgdc_gen);
- if (rc != 0)
+ if (rc != 0)
CERROR("failed at llog_relp_connect\n");
RETURN(rc);
#include <linux/lustre_log.h>
#include <linux/lustre_net.h>
#include <portals/list.h>
+#include <linux/lustre_fsfilt.h>
int llog_origin_handle_create(struct ptlrpc_request *req)
{
RETURN(rc);
}
-#ifdef ENABLE_ORPHANS
int llog_origin_handle_cancel(struct ptlrpc_request *req)
{
struct obd_device *obd = req->rq_export->exp_obd;
struct obd_device *disk_obd;
struct llog_cookie *logcookies;
struct llog_ctxt *ctxt;
- int num_cookies, rc = 0;
+ int num_cookies, rc = 0, err, i;
struct obd_run_ctxt saved;
struct llog_handle *cathandle;
+ struct inode *inode;
+ void *handle;
ENTRY;
logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
CWARN("llog subsys not setup or already cleanup\n");
RETURN(-ENOENT);
}
- down(&ctxt->loc_sem);
- disk_obd = ctxt->loc_exp->exp_obd;
- cathandle = ctxt->loc_handle;
- LASSERT(cathandle);
+ disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
- rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies);
+ for (i = 0; i < num_cookies; i++, logcookies++) {
+ cathandle = ctxt->loc_handle;
+ LASSERT(cathandle != NULL);
+ inode = cathandle->lgh_file->f_dentry->d_inode;
+
+ handle = fsfilt_start(disk_obd, inode,
+ FSFILT_OP_CANCEL_UNLINK_LOG, NULL);
+ if (IS_ERR(handle)) {
+ CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle));
+ GOTO(pop_ctxt, rc = PTR_ERR(handle));
+ }
+
+ rc = llog_cat_cancel_records(cathandle, 1, logcookies);
+
+ err = fsfilt_commit(disk_obd, inode, handle, 0);
+ if (err) {
+ CERROR("error committing transaction: %d\n", err);
+ if (!rc)
+ rc = err;
+ GOTO(pop_ctxt, rc);
+ }
+ }
+pop_ctxt:
+ pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
if (rc)
CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
else
- CWARN("cancel %d llog-records\n", num_cookies);
-
- pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
- up(&ctxt->loc_sem);
+ CDEBUG(D_HA, "cancel %d llog-records\n", num_cookies);
RETURN(rc);
}
EXPORT_SYMBOL(llog_origin_handle_cancel);
-#endif
-static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
+static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
char *client)
{
struct mds_obd *mds = &obd->u.mds;
char name[4][64];
int rc, i, l, remains = buf_len;
char *out = buf;
-
+
if (ctxt == NULL || mds == NULL)
RETURN(-EOPNOTSUPP);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-
+
sprintf(name[0], "%s", mds->mds_profile);
sprintf(name[1], "%s-clean", mds->mds_profile);
sprintf(name[2], "%s", client);
sprintf(name[3], "%s-clean", client);
-
+
for (i = 0; i < 4; i++) {
int index, uncanceled = 0;
rc = llog_create(ctxt, &handle, NULL, name[i]);
llog_close(handle);
GOTO(out_pop, rc = -ENOENT);
}
-
+
for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index ++) {
if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
uncanceled++;
}
-
+
l = snprintf(out, remains, "[Log Name]: %s\nLog Size: %llu\n"
"Last Index: %d\nUncanceled Records: %d\n\n",
- name[i],
+ name[i],
handle->lgh_file->f_dentry->d_inode->i_size,
- handle->lgh_last_idx,
- uncanceled);
+ handle->lgh_last_idx, uncanceled);
out += l;
remains -= l;
int init;
};
-static int llog_catinfo_cb(struct llog_handle *cat,
+static int llog_catinfo_cb(struct llog_handle *cat,
struct llog_rec_hdr *rec, void *data)
{
static char *out = NULL;
if (!(cat->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
RETURN(-EINVAL);
-
+
lir = (struct llog_logid_rec *)rec;
logid = &lir->lid_id;
rc = llog_create(ctxt, &handle, logid, NULL);
rc = llog_init_handle(handle, 0, NULL);
if (rc)
GOTO(out_close, rc);
-
+
for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index++) {
if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
count++;
CWARN("Not enough memory\n");
rc = -ENOMEM;
}
-
+
out_close:
llog_close(handle);
RETURN(rc);
}
-
-static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
+
+static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
int buf_len)
{
struct mds_obd *mds = &obd->u.mds;
int rc;
struct cb_data data;
struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-
+
if (ctxt == NULL || mds == NULL)
RETURN(-EOPNOTSUPP);
-
+
count = mds->mds_lov_desc.ld_tgt_count;
size = sizeof(*idarray) * count;
-
+
OBD_ALLOC(idarray, size);
if (!idarray)
RETURN(-ENOMEM);
memset(idarray, 0, size);
-
+
rc = llog_get_cat_list(obd, obd, name, count, idarray);
- if (rc)
+ if (rc)
GOTO(out_free, rc);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-
+
id = idarray;
data.ctxt = ctxt;
data.out = buf;
if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
uncanceled++;
}
- l = snprintf(data.out, data.remains,
+ l = snprintf(data.out, data.remains,
"\n[Catlog ID]: #"LPX64"#"LPX64"#%08x "
"[Log Count]: %d\n",
id->lgl_oid, id->lgl_ogr, id->lgl_ogen,
uncanceled);
-
+
data.out += l;
data.remains -= l;
data.init = 1;
-
- llog_process(handle, llog_catinfo_cb, &data);
+
+ llog_process(handle, llog_catinfo_cb, &data, NULL);
llog_close(handle);
-
+
if (data.remains <= 0)
break;
}
OBD_FREE(idarray, size);
RETURN(rc);
}
-
+
int llog_catinfo(struct ptlrpc_request *req)
{
struct obd_export *exp = req->rq_export;
if (strlen(buf) == 0)
sprintf(buf, "%s", "No log informations\n");
memcpy(reply, buf, buf_len);
-
+
out_free:
OBD_FREE(buf, buf_len);
return rc;
#include <linux/lustre_log.h>
#include "ptlrpc_internal.h"
+#define LLCD_SIZE 4096
+
#ifdef __KERNEL__
static struct llog_commit_master lustre_lcm;
struct llog_canceld_ctxt *llcd;
int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
- OBD_ALLOC(llcd, PAGE_SIZE + offset);
+ OBD_ALLOC(llcd, LLCD_SIZE + offset);
if (llcd == NULL)
return -ENOMEM;
int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
- OBD_FREE(llcd, PAGE_SIZE + offset);
+ OBD_FREE(llcd, LLCD_SIZE + offset);
} else {
spin_lock(&lcm->lcm_llcd_lock);
list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
EXPORT_SYMBOL(llcd_send);
/* deleted objects have a commit callback that cancels the MDS
- * log record for the deletion. The commit callback calls this
- * function
+ * log record for the deletion. The commit callback calls this
+ * function
*/
int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
GOTO(out, rc = -ENOMEM);
}
- llcd->llcd_import = ctxt->loc_imp;
- llcd->llcd_gen = ctxt->loc_gen;
+ llcd->llcd_ctxt = ctxt;
ctxt->loc_llcd = llcd;
}
llcd->llcd_cookiebytes += sizeof(*cookies);
send_now:
- if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
+ if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
flags & OBD_LLOG_FL_SENDNOW)) {
- CDEBUG(D_HA, "send llcd: %p\n", llcd);
+ CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
ctxt->loc_llcd = NULL;
llcd_send(llcd);
}
int rc = 0;
ENTRY;
- LASSERT(ctxt->loc_llcd);
-
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
- CWARN("import will be destroyed, put llcd %p\n",
- ctxt->loc_llcd);
- llcd_put(ctxt->loc_llcd);
- ctxt->loc_llcd = NULL;
+ down(&ctxt->loc_sem);
+ if (ctxt->loc_llcd != NULL) {
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+ llcd_put(ctxt->loc_llcd);
+ ctxt->loc_llcd = NULL;
+ ctxt->loc_imp = NULL;
+ }
up(&ctxt->loc_sem);
} else {
- up(&ctxt->loc_sem);
rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
}
llcd = list_entry(lcd->lcd_llcd_list.next,
typeof(*llcd), llcd_list);
LASSERT(llcd->llcd_lcm == lcm);
- import = llcd->llcd_import;
+ import = llcd->llcd_ctxt->loc_imp;
}
list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
LASSERT(llcd->llcd_lcm == lcm);
- if (import == llcd->llcd_import)
+ if (import == llcd->llcd_ctxt->loc_imp)
list_move_tail(&llcd->llcd_list,
&lcd->lcd_llcd_list);
}
list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
llcd_list) {
LASSERT(llcd->llcd_lcm == lcm);
- if (import == llcd->llcd_import)
+ if (import == llcd->llcd_ctxt->loc_imp)
list_move_tail(&llcd->llcd_list,
&lcd->lcd_llcd_list);
}
/* We are the only one manipulating our local list - no lock */
list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
char *bufs[1] = {(char *)llcd->llcd_cookies};
- struct obd_device *obd = import->imp_obd;
- struct llog_ctxt *ctxt;
list_del(&llcd->llcd_list);
if (llcd->llcd_cookiebytes == 0) {
- CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
+ CDEBUG(D_HA, "put empty llcd %p:%p\n",
+ llcd, llcd->llcd_ctxt);
llcd_put(llcd);
continue;
}
- /* check whether the cookies are new. if new then send, otherwise
- * just put llcd */
- ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
- LASSERT(ctxt != NULL);
- down(&ctxt->loc_sem);
- if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
- up(&ctxt->loc_sem);
- CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
+
+ down(&llcd->llcd_ctxt->loc_sem);
+ if (llcd->llcd_ctxt->loc_imp == NULL) {
+ up(&llcd->llcd_ctxt->loc_sem);
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
llcd_put(llcd);
continue;
}
- up(&ctxt->loc_sem);
request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
&llcd->llcd_cookiebytes,
bufs);
+ up(&llcd->llcd_ctxt->loc_sem);
+
if (request == NULL) {
rc = -ENOMEM;
CERROR("error preparing commit: rc %d\n", rc);
}
request->rq_replen = lustre_msg_size(0, NULL);
+ down(&llcd->llcd_ctxt->loc_sem);
+ if (llcd->llcd_ctxt->loc_imp == NULL) {
+ up(&llcd->llcd_ctxt->loc_sem);
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+ llcd_put(llcd);
+ ptlrpc_req_finished(request);
+ continue;
+ }
rc = ptlrpc_queue_wait(request);
ptlrpc_req_finished(request);
+ up(&llcd->llcd_ctxt->loc_sem);
/* If the RPC failed, we put this and the remaining
* messages onto the resend list for another time. */
continue;
}
-#if 0 /* FIXME just put llcd, not send it again */
+#if 0 /* FIXME just put llcd, not put it on resend list */
spin_lock(&lcm->lcm_llcd_lock);
list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
if (++llcd->llcd_tries < 5) {
} else {
spin_unlock(&lcm->lcm_llcd_lock);
#endif
- CERROR("commit %p dropped %d cookies: rc %d\n",
- llcd, (int)(llcd->llcd_cookiebytes /
- sizeof(*llcd->llcd_cookies)),
- rc);
+ CERROR("commit %p:%p drop %d cookies: rc %d\n",
+ llcd, llcd->llcd_ctxt,
+ (int)(llcd->llcd_cookiebytes /
+ sizeof(*llcd->llcd_cookies)), rc);
llcd_put(llcd);
-// }
+#if 0
+ }
break;
+#endif
}
if (rc == 0) {
EXPORT_SYMBOL(llog_start_commit_thread);
static struct llog_process_args {
- struct semaphore llpa_sem;
+ struct semaphore llpa_sem;
struct llog_ctxt *llpa_ctxt;
void *llpa_cb;
void *llpa_arg;
} llpa;
+
int llog_init_commit_master(void)
{
INIT_LIST_HEAD(&lcm->lcm_thread_busy);
return 0;
}
-
static int log_process_thread(void *args)
{
struct llog_process_args *data = args;
unsigned long flags;
int rc;
ENTRY;
-
+
up(&data->llpa_sem);
lock_kernel();
ptlrpc_daemonize(); /* thread never needs to do IO */
-
+
SIGNAL_MASK_LOCK(current, flags);
sigfillset(¤t->blocked);
RECALC_SIGPENDING;
SIGNAL_MASK_UNLOCK(current, flags);
unlock_kernel();
-
+
rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
CERROR("llog_init_handle failed %d\n", rc);
GOTO(out, rc);
}
-
+
if (cb) {
rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
- if (rc)
+ if (rc != LLOG_PROC_BREAK)
CERROR("llog_cat_process failed %d\n", rc);
- } else
+ } else {
CWARN("no callback function for recovery\n");
+ }
- CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
+ CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
+ ctxt->loc_llcd, ctxt);
llog_sync(ctxt, NULL);
out:
rc = llog_cat_put(llh);
if (rc)
CERROR("llog_cat_put failed %d\n", rc);
-
+
RETURN(rc);
}
-static int llog_recovery_generic(struct llog_ctxt *ctxt,
- void *handle,
- void *arg)
+
+static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
{
int rc;
ENTRY;
RETURN(rc);
}
+
int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen)
{
struct llog_canceld_ctxt *llcd;
int rc;
ENTRY;
-
+
+ /* send back llcd before recovery from llog */
+ if (ctxt->loc_llcd != NULL) {
+ CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
+ llog_sync(ctxt, NULL);
+ }
+
down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
- llcd = ctxt->loc_llcd;
- if (llcd) {
- CDEBUG(D_HA, "put current llcd when new connection arrives\n");
- llcd_put(llcd);
- }
llcd = llcd_grab();
if (llcd == NULL) {
CERROR("couldn't get an llcd\n");
+ up(&ctxt->loc_sem);
RETURN(-ENOMEM);
}
- llcd->llcd_import = ctxt->loc_imp;
- llcd->llcd_gen = ctxt->loc_gen;
+ llcd->llcd_ctxt = ctxt;
ctxt->loc_llcd = llcd;
up(&ctxt->loc_sem);
- rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
+ rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
if (rc != 0)
CERROR("error recovery process: %d\n", rc);
. ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
# Skip these tests
-ALWAYS_EXCEPT="35"
+ALWAYS_EXCEPT=""
gen_config() {
}
run_test 37 "abort recovery before client does replay (test mds_cleanup_orphans for directories)"
+test_38() {
+ for i in `seq 1 800`; do
+ touch $DIR/$tfile-$i
+ done
+ for i in `seq 1 400`; do
+ rm $DIR/$tfile-$i
+ done
+
+ replay_barrier mds
+ fail mds
+ for i in `seq 401 800`; do
+ rm $DIR/$tfile-$i
+ done
+ sleep 2
+ $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 38 "test recovery from unlink llog (test llog_gen_rec) "
+
+test_39() {
+ for i in `seq 1 800`; do
+ touch $DIR/$tfile-$i
+ done
+
+ replay_barrier mds
+ for i in `seq 1 400`; do
+ rm $DIR/$tfile-$i
+ done
+ fail mds
+ for i in `seq 401 800`; do
+ rm $DIR/$tfile-$i
+ done
+ sleep 2
+ $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 39 "test recovery from unlink llog (test llog_gen_rec) "
+
equals_msg test complete, cleaning up
$CLEANUP
CHECK_VALUE(OBD_MD_FLOSCOPQ);
CHECK_VALUE(OBD_MD_FLCOOKIE);
CHECK_VALUE(OBD_MD_FLGROUP);
+
+ CHECK_VALUE(OBD_FL_INLINEDATA);
+ CHECK_VALUE(OBD_FL_OBDMDEXISTS);
+ CHECK_VALUE(OBD_FL_DELORPHAN);
+ CHECK_VALUE(OBD_FL_NORPC);
+ CHECK_VALUE(OBD_FL_IDONLY);
}
void
CHECK_VALUE(OBD_BRW_WRITE);
CHECK_VALUE(OBD_BRW_CREATE);
CHECK_VALUE(OBD_BRW_SYNC);
+ CHECK_VALUE(OBD_BRW_FROM_GRANT);
}
void
CHECK_VALUE(FMODE_EXEC);
CHECK_VALUE(MDS_OPEN_CREAT);
CHECK_VALUE(MDS_OPEN_EXCL);
+ CHECK_VALUE(MDS_OPEN_TRUNC);
CHECK_VALUE(MDS_OPEN_APPEND);
CHECK_VALUE(MDS_OPEN_SYNC);
CHECK_VALUE(MDS_OPEN_DIRECTORY);
CHECK_VALUE(MDS_UNLINK_REC);
CHECK_VALUE(OBD_CFG_REC);
CHECK_VALUE(PTL_CFG_REC);
+ CHECK_VALUE(LLOG_GEN_REC);
CHECK_VALUE(LLOG_HDR_MAGIC);
CHECK_VALUE(LLOG_LOGID_MAGIC);
}
}
void
+check_llog_logid_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_logid_rec);
+ CHECK_MEMBER(llog_logid_rec, lid_hdr);
+ CHECK_MEMBER(llog_logid_rec, lid_id);
+ CHECK_MEMBER(llog_logid_rec, lid_tail);
+}
+
+void
+check_llog_create_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_create_rec);
+ CHECK_MEMBER(llog_create_rec, lcr_hdr);
+ CHECK_MEMBER(llog_create_rec, lcr_fid);
+ CHECK_MEMBER(llog_create_rec, lcr_oid);
+ CHECK_MEMBER(llog_create_rec, lcr_ogen);
+}
+
+void
+check_llog_orphan_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_orphan_rec);
+ CHECK_MEMBER(llog_orphan_rec, lor_hdr);
+ CHECK_MEMBER(llog_orphan_rec, lor_oid);
+ CHECK_MEMBER(llog_orphan_rec, lor_ogen);
+ CHECK_MEMBER(llog_orphan_rec, lor_tail);
+}
+
+void
+check_llog_unlink_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_unlink_rec);
+ CHECK_MEMBER(llog_unlink_rec, lur_hdr);
+ CHECK_MEMBER(llog_unlink_rec, lur_oid);
+ CHECK_MEMBER(llog_unlink_rec, lur_ogen);
+ CHECK_MEMBER(llog_unlink_rec, lur_tail);
+}
+
+void
+check_llog_size_change_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_size_change_rec);
+ CHECK_MEMBER(llog_size_change_rec, lsc_hdr);
+ CHECK_MEMBER(llog_size_change_rec, lsc_fid);
+ CHECK_MEMBER(llog_size_change_rec, lsc_io_epoch);
+ CHECK_MEMBER(llog_size_change_rec, lsc_tail);
+}
+
+void
+check_llog_gen(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_gen);
+ CHECK_MEMBER(llog_gen, mnt_cnt);
+ CHECK_MEMBER(llog_gen, conn_cnt);
+}
+
+void
+check_llog_gen_rec(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_gen_rec);
+ CHECK_MEMBER(llog_gen_rec, lgr_hdr);
+ CHECK_MEMBER(llog_gen_rec, lgr_gen);
+ CHECK_MEMBER(llog_gen_rec, lgr_tail);
+}
+
+void
check_llog_log_hdr(void)
{
BLANK_LINE();
CHECK_VALUE(LLOG_ORIGIN_HANDLE_READ_HEADER);
CHECK_VALUE(LLOG_ORIGIN_HANDLE_WRITE_REC);
CHECK_VALUE(LLOG_ORIGIN_HANDLE_CLOSE);
-}
-
-void
-check_llog_ctxt_gen(void)
-{
- BLANK_LINE();
- CHECK_STRUCT(llog_ctxt_gen);
- CHECK_MEMBER(llog_ctxt_gen, mnt_cnt);
- CHECK_MEMBER(llog_ctxt_gen, conn_cnt);
+ CHECK_VALUE(LLOG_ORIGIN_CONNECT);
+ CHECK_VALUE(LLOG_CATINFO);
}
void
CHECK_VALUE(OST_SAN_WRITE);
CHECK_VALUE(OST_SYNC);
CHECK_VALUE(OST_LAST_OPC);
- CHECK_VALUE(OST_FIRST_OPC);
-
- CHECK_VALUE(OBD_FL_INLINEDATA);
- CHECK_VALUE(OBD_FL_OBDMDEXISTS);
CHECK_DEFINE(OBD_OBJECT_EOF);
CHECK_VALUE(MDS_DISCONNECT);
CHECK_VALUE(MDS_GETSTATUS);
CHECK_VALUE(MDS_STATFS);
+ CHECK_VALUE(MDS_PIN);
+ CHECK_VALUE(MDS_UNPIN);
+ CHECK_VALUE(MDS_SYNC);
+ CHECK_VALUE(MDS_DONE_WRITING);
CHECK_VALUE(MDS_LAST_OPC);
- CHECK_VALUE(MDS_FIRST_OPC);
CHECK_VALUE(REINT_SETATTR);
CHECK_VALUE(REINT_CREATE);
CHECK_VALUE(LDLM_BL_CALLBACK);
CHECK_VALUE(LDLM_CP_CALLBACK);
CHECK_VALUE(LDLM_LAST_OPC);
- CHECK_VALUE(LDLM_FIRST_OPC);
+
+ CHECK_VALUE(LCK_EX);
+ CHECK_VALUE(LCK_PW);
+ CHECK_VALUE(LCK_PR);
+ CHECK_VALUE(LCK_CW);
+ CHECK_VALUE(LCK_CR);
+ CHECK_VALUE(LCK_NL);
CHECK_VALUE(PTLBD_QUERY);
CHECK_VALUE(PTLBD_READ);
CHECK_VALUE(PTLBD_CONNECT);
CHECK_VALUE(PTLBD_DISCONNECT);
CHECK_VALUE(PTLBD_LAST_OPC);
- CHECK_VALUE(PTLBD_FIRST_OPC);
+
+ CHECK_VALUE(MGMT_CONNECT);
+ CHECK_VALUE(MGMT_DISCONNECT);
+ CHECK_VALUE(MGMT_EXCEPTION);
CHECK_VALUE(OBD_PING);
+ CHECK_VALUE(OBD_LOG_CANCEL);
+ CHECK_VALUE(OBD_LAST_OPC);
COMMENT("Sizes and Offsets");
BLANK_LINE();
LASSERT(OST_SAN_WRITE == 15);
LASSERT(OST_SYNC == 16);
LASSERT(OST_LAST_OPC == 18);
- LASSERT(OST_FIRST_OPC == 0);
- LASSERT(OBD_FL_INLINEDATA == 1);
- LASSERT(OBD_FL_OBDMDEXISTS == 2);
LASSERT(OBD_OBJECT_EOF == 0xffffffffffffffffULL);
LASSERT(OST_REQ_HAS_OA1 == 1);
LASSERT(MDS_GETATTR == 33);
LASSERT(MDS_DISCONNECT == 39);
LASSERT(MDS_GETSTATUS == 40);
LASSERT(MDS_STATFS == 41);
- LASSERT(MDS_LAST_OPC == 45);
- LASSERT(MDS_FIRST_OPC == 33);
+ LASSERT(MDS_PIN == 42);
+ LASSERT(MDS_UNPIN == 43);
+ LASSERT(MDS_SYNC == 44);
+ LASSERT(MDS_DONE_WRITING == 45);
+ LASSERT(MDS_LAST_OPC == 46);
LASSERT(REINT_SETATTR == 1);
LASSERT(REINT_CREATE == 2);
LASSERT(REINT_LINK == 3);
LASSERT(LDLM_BL_CALLBACK == 104);
LASSERT(LDLM_CP_CALLBACK == 105);
LASSERT(LDLM_LAST_OPC == 106);
- LASSERT(LDLM_FIRST_OPC == 101);
+ LASSERT(LCK_EX == 1);
+ LASSERT(LCK_PW == 2);
+ LASSERT(LCK_PR == 3);
+ LASSERT(LCK_CW == 4);
+ LASSERT(LCK_CR == 5);
+ LASSERT(LCK_NL == 6);
LASSERT(PTLBD_QUERY == 200);
LASSERT(PTLBD_READ == 201);
LASSERT(PTLBD_WRITE == 202);
LASSERT(PTLBD_CONNECT == 204);
LASSERT(PTLBD_DISCONNECT == 205);
LASSERT(PTLBD_LAST_OPC == 206);
- LASSERT(PTLBD_FIRST_OPC == 200);
+ LASSERT(MGMT_CONNECT == 250);
+ LASSERT(MGMT_DISCONNECT == 251);
+ LASSERT(MGMT_EXCEPTION == 252);
LASSERT(OBD_PING == 400);
+ LASSERT(OBD_LOG_CANCEL == 401);
+ LASSERT(OBD_LAST_OPC == 402);
/* Sizes and Offsets */
LASSERT(OBD_MD_FLOSCOPQ == 4194304);
LASSERT(OBD_MD_FLCOOKIE == 8388608);
LASSERT(OBD_MD_FLGROUP == 16777216);
+ LASSERT(OBD_FL_INLINEDATA == 1);
+ LASSERT(OBD_FL_OBDMDEXISTS == 2);
+ LASSERT(OBD_FL_DELORPHAN == 4);
+ LASSERT(OBD_FL_NORPC == 8);
+ LASSERT(OBD_FL_IDONLY == 16);
/* Checks for struct lov_mds_md_v1 */
LASSERT((int)sizeof(struct lov_mds_md_v1) == 32);
LASSERT(OBD_BRW_WRITE == 2);
LASSERT(OBD_BRW_CREATE == 4);
LASSERT(OBD_BRW_SYNC == 8);
+ LASSERT(OBD_BRW_FROM_GRANT == 32);
/* Checks for struct ost_body */
LASSERT((int)sizeof(struct ost_body) == 168);
LASSERT(FMODE_EXEC == 4);
LASSERT(MDS_OPEN_CREAT == 64);
LASSERT(MDS_OPEN_EXCL == 128);
+ LASSERT(MDS_OPEN_TRUNC == 512);
LASSERT(MDS_OPEN_APPEND == 1024);
LASSERT(MDS_OPEN_SYNC == 4096);
LASSERT(MDS_OPEN_DIRECTORY == 65536);
LASSERT(MDS_UNLINK_REC == 274801668);
LASSERT(OBD_CFG_REC == 274857984);
LASSERT(PTL_CFG_REC == 274923520);
+ LASSERT(LLOG_GEN_REC == 274989056);
LASSERT(LLOG_HDR_MAGIC == 275010873);
LASSERT(LLOG_LOGID_MAGIC == 275010874);
LASSERT(LLOG_ORIGIN_HANDLE_READ_HEADER == 503);
LASSERT(LLOG_ORIGIN_HANDLE_WRITE_REC == 504);
LASSERT(LLOG_ORIGIN_HANDLE_CLOSE == 505);
+ LASSERT(LLOG_ORIGIN_CONNECT == 506);
+ LASSERT(LLOG_CATINFO == 507);
/* Checks for struct llogd_conn_body */
LASSERT((int)sizeof(struct llogd_conn_body) == 40);