From 5d5203454e8ca6b7ea3657e8f0d60d67471e2009 Mon Sep 17 00:00:00 2001 From: tappro Date: Mon, 11 Sep 2006 23:36:00 +0000 Subject: [PATCH] update after codeinsp --- lustre/mdt/mdt_internal.h | 34 ++-- lustre/mdt/mdt_recovery.c | 383 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 301 insertions(+), 116 deletions(-) diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 2bd74d4..39a4e88 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -71,25 +71,18 @@ struct mdt_client_data { static inline __u64 mcd_last_transno(struct mdt_client_data *mcd) { - return (le64_to_cpu(mcd->mcd_last_transno) > - le64_to_cpu(mcd->mcd_last_close_transno) ? - le64_to_cpu(mcd->mcd_last_transno) : - le64_to_cpu(mcd->mcd_last_close_transno)); + return max(mcd->mcd_last_transno, mcd->mcd_last_close_transno); } static inline __u64 mcd_last_xid(struct mdt_client_data *mcd) { - return (le64_to_cpu(mcd->mcd_last_xid) > - le64_to_cpu(mcd->mcd_last_close_xid) ? - le64_to_cpu(mcd->mcd_last_xid) : - le64_to_cpu(mcd->mcd_last_close_xid)); + return max(mcd->mcd_last_xid, mcd->mcd_last_close_xid); } /* copied from lr_server_data. * mds data stored at the head of last_rcvd file. In le32 order. */ struct mdt_server_data { __u8 msd_uuid[40]; /* server UUID */ - __u64 msd_unused; /* was fsd_last_objid - don't use for now */ __u64 msd_last_transno; /* last completed transaction ID */ __u64 msd_mount_count; /* incarnation number */ __u32 msd_feature_compat; /* compatible feature flags */ @@ -98,13 +91,13 @@ struct mdt_server_data { __u32 msd_server_size; /* size of server data area */ __u32 msd_client_start; /* start of per-client data area */ __u16 msd_client_size; /* size of per-client data area */ - __u16 msd_subdir_count; /* number of subdirectories for objects */ - __u64 msd_catalog_oid; /* recovery catalog object id */ - __u32 msd_catalog_ogen; /* recovery catalog inode generation */ - __u8 msd_peeruuid[40]; /* UUID of MDS associated with this OST */ - __u32 msd_ost_index; /* index number of OST in LOV */ - __u32 msd_mdt_index; /* index number of MDT in LMV */ - __u8 msd_padding[LR_SERVER_SIZE - 148]; + //__u16 msd_subdir_count; /* number of subdirectories for objects */ + //__u64 msd_catalog_oid; /* recovery catalog object id */ + //__u32 msd_catalog_ogen; /* recovery catalog inode generation */ + //__u8 msd_peeruuid[40]; /* UUID of MDS associated with this OST */ + //__u32 msd_ost_index; /* index number of OST in LOV */ + //__u32 msd_mdt_index; /* index number of MDT in LMV */ + __u8 msd_padding[LR_SERVER_SIZE - 78]; }; struct mdt_object; @@ -163,7 +156,9 @@ struct mdt_device { int mdt_max_cookiesize; __u64 mdt_mount_count; + /* last_rcvd data */ struct mdt_server_data mdt_msd; + spinlock_t mdt_client_bitmap_lock; unsigned long mdt_client_bitmap[(LR_MAX_CLIENTS >> 3) / sizeof(long)]; }; @@ -296,6 +291,13 @@ struct mdt_thread_info { struct l_wait_info mti_wait_info; } rdpg; } mti_u; + + /* server and client data buffers */ + struct mdt_server_data mti_msd; + struct mdt_client_data mti_mcd; + loff_t mti_off; + struct txn_param mti_txn_param; + }; /* * Info allocated per-transaction. diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index b5bd3c1..5f98f7c 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -6,7 +6,7 @@ * * Copyright (C) 2002-2006 Cluster File Systems, Inc. * Author: Huang Hua - * Author; Pershin Mike + * Author: Pershin Mike * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. @@ -33,15 +33,13 @@ #include "mdt_internal.h" -static int mdt_update_server_data(const struct lu_context *ctx, +static int mdt_server_data_update(const struct lu_context *ctx, struct mdt_device *mdt); /* TODO: maybe this pair should be defined in dt_object.c */ -static /*inline*/ ssize_t mdt_read_record(const struct lu_context *ctx, - struct dt_object *dt, - void *buf, - size_t count, - loff_t *pos) +static int mdt_record_read(const struct lu_context *ctx, + struct dt_object *dt, void *buf, + size_t count, loff_t *pos) { int rc; @@ -56,17 +54,14 @@ static /*inline*/ ssize_t mdt_read_record(const struct lu_context *ctx, return rc; } -static /*inline*/ ssize_t mdt_write_record(const struct lu_context *ctx, - struct dt_object *dt, - const void *buf, - size_t count, - loff_t *pos, - struct thandle *th) +static int mdt_record_write(const struct lu_context *ctx, + struct dt_object *dt, const void *buf, + size_t count, loff_t *pos, struct thandle *th) { int rc; LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); - + LASSERT(th != NULL); rc = dt->do_body_ops->dbo_write(ctx, dt, buf, count, pos, th); if (rc == count) rc = 0; @@ -74,53 +69,173 @@ static /*inline*/ ssize_t mdt_write_record(const struct lu_context *ctx, rc = -EFAULT; return rc; } +/* only one record write */ + +enum { + MDT_TXN_WRITE_RECORD_CREDITS = 3 +}; + +static struct thandle* mdt_trans_start(const struct lu_context *ctx, + struct mdt_device *mdt) +{ + struct mdt_thread_info *mti; + struct txn_param *p; + + mti = lu_context_key_get(ctx, &mdt_thread_key); + p = &mti->mti_txn_param; + return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p); +} + +static void mdt_trans_stop(const struct lu_context *ctx, + struct mdt_device *mdt, struct thandle *th) +{ + mdt->mdt_bottom->dd_ops->dt_trans_stop(ctx, th); +} /* last_rcvd handling */ -static int mdt_read_last_rcvd_header(const struct lu_context *ctx, +static inline void msd_le_to_cpu(struct mdt_server_data *buf, + struct mdt_server_data *msd) +{ + msd->msd_last_transno = le64_to_cpu(buf->msd_last_transno); + msd->msd_mount_count = le64_to_cpu(buf->msd_mount_count); + msd->msd_feature_compat = le32_to_cpu(buf->msd_feature_compat); + msd->msd_feature_rocompat = le32_to_cpu(buf->msd_feature_rocompat); + msd->msd_feature_incompat = le32_to_cpu(buf->msd_feature_incompat); + msd->msd_server_size = le32_to_cpu(buf->msd_server_size); + msd->msd_client_start = le32_to_cpu(buf->msd_client_start); + msd->msd_client_size = le16_to_cpu(buf->msd_client_size); +} + +static inline void msd_cpu_to_le(struct mdt_server_data *msd, + struct mdt_server_data *buf) +{ + buf->msd_last_transno = cpu_to_le64(msd->msd_last_transno); + buf->msd_mount_count = cpu_to_le64(msd->msd_mount_count); + buf->msd_feature_compat = cpu_to_le32(msd->msd_feature_compat); + buf->msd_feature_rocompat = cpu_to_le32(msd->msd_feature_rocompat); + buf->msd_feature_incompat = cpu_to_le32(msd->msd_feature_incompat); + buf->msd_server_size = cpu_to_le32(msd->msd_server_size); + buf->msd_client_start = cpu_to_le32(msd->msd_client_start); + buf->msd_client_size = cpu_to_le16(msd->msd_client_size); +} + +static inline void mcd_le_to_cpu(struct mdt_client_data *buf, + struct mdt_client_data *mcd) +{ + mcd->mcd_last_transno = le64_to_cpu(buf->mcd_last_transno); + mcd->mcd_last_xid = le64_to_cpu(buf->mcd_last_xid); + mcd->mcd_last_result = le32_to_cpu(buf->mcd_last_result); + mcd->mcd_last_data = le32_to_cpu(buf->mcd_last_data); + mcd->mcd_last_close_transno = le64_to_cpu(buf->mcd_last_close_transno); + mcd->mcd_last_close_xid = le64_to_cpu(buf->mcd_last_close_xid); + mcd->mcd_last_close_result = le32_to_cpu(buf->mcd_last_close_result); +} + +static inline void mcd_cpu_to_le(struct mdt_client_data *mcd, + struct mdt_client_data *buf) +{ + buf->mcd_last_transno = cpu_to_le64(mcd->mcd_last_transno); + buf->mcd_last_xid = cpu_to_le64(mcd->mcd_last_xid); + buf->mcd_last_result = cpu_to_le32(mcd->mcd_last_result); + buf->mcd_last_data = cpu_to_le32(mcd->mcd_last_data); + buf->mcd_last_close_transno = cpu_to_le64(mcd->mcd_last_close_transno); + buf->mcd_last_close_xid = cpu_to_le64(mcd->mcd_last_close_xid); + buf->mcd_last_close_result = cpu_to_le32(mcd->mcd_last_close_result); +} + +static int mdt_last_rcvd_header_read(const struct lu_context *ctx, struct mdt_device *mdt, struct mdt_server_data *msd) { - loff_t off = 0; - return mdt_read_record(ctx, mdt->mdt_last_rcvd, - msd, sizeof(*msd), &off); + struct mdt_thread_info *mti; + struct mdt_server_data *tmp; + loff_t *off; + int rc; + + mti = lu_context_key_get(ctx, &mdt_thread_key); + /* temporary stuff for read */ + tmp = &mti->mti_msd; + off = &mti->mti_off; + rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, + tmp, sizeof(*tmp), off); + if (rc == 0) { + memcpy(msd->msd_uuid, tmp->msd_uuid, sizeof (msd->msd_uuid)); + msd_le_to_cpu(tmp, msd); + } + return 0; } -static int mdt_write_last_rcvd_header(const struct lu_context *ctx, +static int mdt_last_rcvd_header_write(const struct lu_context *ctx, struct mdt_device *mdt, - struct mdt_server_data *msd, - struct thandle *th) + struct mdt_server_data *msd) { + struct mdt_thread_info *mti; + struct mdt_server_data *tmp; + struct thandle *th; + loff_t *off; int rc; - loff_t off = 0; - rc = mdt_write_record(ctx, mdt->mdt_last_rcvd, - msd, sizeof(*msd), &off, th); - - CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" - "uuid = %s\n" - "last_transno = "LPU64"\n", - rc, - msd->msd_uuid, - msd->msd_last_transno); + mti = lu_context_key_get(ctx, &mdt_thread_key); + + mti->mti_txn_param.tp_credits = MDT_TXN_WRITE_RECORD_CREDITS; + th = mdt_trans_start(ctx, mdt); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + /* temporary stuff for read */ + tmp = &mti->mti_msd; + off = &mti->mti_off; + + memcpy(tmp->msd_uuid, msd->msd_uuid, sizeof (msd->msd_uuid)); + msd_cpu_to_le(msd, tmp); + + rc = mdt_record_write(ctx, mdt->mdt_last_rcvd, + tmp, sizeof(*tmp), off, th); + + CDEBUG(D_INFO, "write last_rcvd header (rc = %d):\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, msd->msd_uuid, msd->msd_last_transno); + + mdt_trans_stop(ctx, mdt, th); return rc; } -static int mdt_read_last_rcvd(const struct lu_context *ctx, +static int mdt_last_rcvd_read(const struct lu_context *ctx, struct mdt_device *mdt, struct mdt_client_data *mcd, loff_t *off) { - return mdt_read_record(ctx, mdt->mdt_last_rcvd, - mcd, sizeof(*mcd), off); + struct mdt_thread_info *mti; + struct mdt_client_data *tmp; + int rc; + + mti = lu_context_key_get(ctx, &mdt_thread_key); + tmp = &mti->mti_mcd; + rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, tmp, sizeof(*tmp), off); + if (rc == 0) { + memcpy(mcd->mcd_uuid, tmp->mcd_uuid, sizeof (mcd->mcd_uuid)); + mcd_le_to_cpu(tmp, mcd); + } + return rc; } -static int mdt_write_last_rcvd(const struct lu_context *ctx, +static int mdt_last_rcvd_write(const struct lu_context *ctx, struct mdt_device *mdt, struct mdt_client_data *mcd, loff_t *off, struct thandle *th) { + struct mdt_thread_info *mti; + struct mdt_client_data *tmp; int rc; - rc = mdt_write_record(ctx, mdt->mdt_last_rcvd, - mcd, sizeof(*mcd), off, th); + + LASSERT(th != NULL); + mti = lu_context_key_get(ctx, &mdt_thread_key); + tmp = &mti->mti_mcd; + + memcpy(mcd->mcd_uuid, tmp->mcd_uuid, sizeof (mcd->mcd_uuid)); + mcd_cpu_to_le(mcd, tmp); + + rc = mdt_record_write(ctx, mdt->mdt_last_rcvd, + tmp, sizeof(*tmp), off, th); CDEBUG(D_INFO, "write mcd rc = %d:\n" "uuid = %s\n" @@ -143,14 +258,15 @@ static int mdt_write_last_rcvd(const struct lu_context *ctx, return rc; } -static int mdt_init_clients_data(const struct lu_context *ctx, - struct mdt_device *mdt, - unsigned long last_size) + +static int mdt_clients_data_init(const struct lu_context *ctx, + struct mdt_device *mdt, + unsigned long last_size) { struct mdt_server_data *msd = &mdt->mdt_msd; struct mdt_client_data *mcd = NULL; struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd; - loff_t off = 0; + loff_t off; int cl_idx; int rc = 0; ENTRY; @@ -159,22 +275,22 @@ static int mdt_init_clients_data(const struct lu_context *ctx, * the header. If we find clients with higher last_transno values * then those clients may need recovery done. */ - for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start); + for (cl_idx = 0, off = msd->msd_client_start; off < last_size; cl_idx++) { __u64 last_transno; struct obd_export *exp; struct mdt_export_data *med; if (!mcd) { - OBD_ALLOC_WAIT(mcd, sizeof(*mcd)); + OBD_ALLOC_PTR(mcd); if (!mcd) RETURN(-ENOMEM); } - off = le32_to_cpu(msd->msd_client_start) + - cl_idx * le16_to_cpu(msd->msd_client_size); + off = msd->msd_client_start + + cl_idx * msd->msd_client_size; - rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off); + rc = mdt_last_rcvd_read(ctx, mdt, mcd, &off); if (rc) { CERROR("error reading MDS %s idx %d, off %llu: rc %d\n", LAST_RCVD, cl_idx, off, rc); @@ -196,16 +312,14 @@ static int mdt_init_clients_data(const struct lu_context *ctx, */ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx, - last_transno, le64_to_cpu(msd->msd_last_transno), + last_transno, msd->msd_last_transno, mcd_last_xid(mcd)); exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid); if (IS_ERR(exp)) { rc = 0; continue; - /* FIXME: Do we really want to return error? - GOTO(err_client, rc = PTR_ERR(exp)); - */ + /* FIXME: Do we really want to return error? */ } med = &exp->exp_mdt_data; @@ -221,7 +335,7 @@ static int mdt_init_clients_data(const struct lu_context *ctx, CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n", cl_idx, last_transno); - + /* protect __u64 value update */ spin_lock(&mdt->mdt_transno_lock); if (last_transno > mdt->mdt_last_transno) mdt->mdt_last_transno = last_transno; @@ -233,7 +347,7 @@ static int mdt_init_clients_data(const struct lu_context *ctx, RETURN(rc); } -static int mdt_init_server_data(const struct lu_context *ctx, +static int mdt_server_data_init(const struct lu_context *ctx, struct mdt_device *mdt) { struct mdt_server_data *msd = &mdt->mdt_msd; @@ -242,15 +356,15 @@ static int mdt_init_server_data(const struct lu_context *ctx, struct mdt_thread_info *mti; struct dt_object *obj; struct lu_attr *la; - unsigned long last_rcvd_size = 0; + unsigned long last_rcvd_size; __u64 mount_count; - int rc = 0; + int rc; ENTRY; /* ensure padding in the struct is the correct size */ - LASSERT(offsetof(struct mdt_server_data, msd_padding) + + CLASSERT(offsetof(struct mdt_server_data, msd_padding) + sizeof(msd->msd_padding) == LR_SERVER_SIZE); - LASSERT(offsetof(struct mdt_client_data, mcd_padding) + + CLASSERT(offsetof(struct mdt_client_data, mcd_padding) + sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE); mti = lu_context_key_get(ctx, &mdt_thread_key); @@ -273,14 +387,14 @@ static int mdt_init_server_data(const struct lu_context *ctx, sizeof(msd->msd_uuid)); msd->msd_last_transno = 0; msd->msd_mount_count = 0; - msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE); - msd->msd_client_start = cpu_to_le32(LR_CLIENT_START); - msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE); - msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID); - msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT | - OBD_INCOMPAT_COMMON_LR); + msd->msd_server_size = LR_SERVER_SIZE; + msd->msd_client_start = LR_CLIENT_START; + msd->msd_client_size = LR_CLIENT_SIZE; + msd->msd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID; + msd->msd_feature_incompat = OBD_INCOMPAT_MDT | + OBD_INCOMPAT_COMMON_LR; } else { - rc = mdt_read_last_rcvd_header(ctx, mdt, msd); + rc = mdt_last_rcvd_header_read(ctx, mdt, msd); if (rc) { CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc); GOTO(out, rc); @@ -293,8 +407,8 @@ static int mdt_init_server_data(const struct lu_context *ctx, GOTO(out, rc = -EINVAL); } } - mount_count = le64_to_cpu(msd->msd_mount_count); - + mount_count = msd->msd_mount_count; +#if 0 if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) { CERROR("%s: unsupported incompat filesystem feature(s) %x\n", obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) & @@ -317,28 +431,30 @@ static int mdt_init_server_data(const struct lu_context *ctx, msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR); */ } - msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT); +#endif + msd->msd_feature_compat = OBD_COMPAT_MDT; spin_lock(&mdt->mdt_transno_lock); - mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno); + mdt->mdt_last_transno = msd->msd_last_transno; spin_unlock(&mdt->mdt_transno_lock); + CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n"); CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n", obd->obd_name, mdt->mdt_last_transno); CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", obd->obd_name, mount_count + 1); CDEBUG(D_INODE, "%s: server data size: %u\n", - obd->obd_name, le32_to_cpu(msd->msd_server_size)); + obd->obd_name, msd->msd_server_size); CDEBUG(D_INODE, "%s: per-client data start: %u\n", - obd->obd_name, le32_to_cpu(msd->msd_client_start)); + obd->obd_name, msd->msd_client_start); CDEBUG(D_INODE, "%s: per-client data size: %u\n", - obd->obd_name, le32_to_cpu(msd->msd_client_size)); + obd->obd_name, msd->msd_client_size); CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n", obd->obd_name, last_rcvd_size); CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, - last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 : - (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) / - le16_to_cpu(msd->msd_client_size)); + last_rcvd_size <= msd->msd_client_start ? 0 : + (last_rcvd_size - msd->msd_client_start) / + msd->msd_client_size); CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n"); if (!msd->msd_server_size || !msd->msd_client_start || @@ -347,7 +463,7 @@ static int mdt_init_server_data(const struct lu_context *ctx, GOTO(out, rc = -EINVAL); } - rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size); + rc = mdt_clients_data_init(ctx, mdt, last_rcvd_size); if (rc) GOTO(err_client, rc); @@ -370,10 +486,10 @@ static int mdt_init_server_data(const struct lu_context *ctx, } mdt->mdt_mount_count++; - msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count); + msd->msd_mount_count = mdt->mdt_mount_count; /* save it, so mount count and last_transno is current */ - rc = mdt_update_server_data(ctx, mdt); + rc = mdt_server_data_update(ctx, mdt); if (rc) GOTO(err_client, rc); @@ -385,24 +501,78 @@ out: return rc; } -static int mdt_update_server_data(const struct lu_context *ctx, +static int mdt_server_data_update(const struct lu_context *ctx, struct mdt_device *mdt) { struct mdt_server_data *msd = &mdt->mdt_msd; - int rc = 0; + int rc; ENTRY; CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", mdt->mdt_mount_count, mdt->mdt_last_transno); spin_lock(&mdt->mdt_transno_lock); - msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno); + msd->msd_last_transno = mdt->mdt_last_transno; spin_unlock(&mdt->mdt_transno_lock); - rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL); + rc = mdt_last_rcvd_header_write(ctx, mdt, msd); RETURN(rc); } +#if 0 +int mdt_client_new(const struct lu_context *ctx, + struct mdt_device *mdt, + struct mdt_export_data *med) +{ + unsigned long *bitmap = mdt->mdt_client_bitmap; + struct mdt_client_data *mcd = med->med_mcd; + struct mdt_server_data *msd = &mdt->mdt_msd; + int rc = 0; + ENTRY; + + LASSERT(bitmap != NULL); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); +repeat: + if (cl_idx >= LR_MAX_CLIENTS || + MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) { + CERROR("no room for clients - fix LR_MAX_CLIENTS\n"); + return -EOVERFLOW; + } + if (test_and_set_bit(cl_idx, bitmap)) { + cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, + cl_idx); + goto repeat; + } + CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n", + cl_idx, med->med_mcd->mcd_uuid); + + med->med_lr_idx = cl_idx; + med->med_lr_off = msd->msd_client_start + + (cl_idx * msd->msd_client_size); + init_mutex(&med->med_mcd_lock); + + LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off); + + if (new_client) { + loff_t off = med->med_lr_off; + struct thandle *th; + th = mdt_trans_start(ctx, mdt); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th); + CDEBUG(D_INFO, + "wrote client mcd at idx %u off %llu (len %u)\n", + cl_idx, med->med_lr_off, sizeof(*mcd)); + mdt_trans_stop(ctx, mdt, th); + } + RETURN(rc); +} +#endif /* Add client data to the MDS. We use a bitmap to locate a free space * in the last_rcvd file if cl_off is -1 (i.e. a new client). * Otherwise, we just have to read the data from the last_rcvd file and @@ -453,28 +623,35 @@ int mdt_client_add(const struct lu_context *ctx, cl_idx, med->med_mcd->mcd_uuid); med->med_lr_idx = cl_idx; - med->med_lr_off = le32_to_cpu(msd->msd_client_start) + - (cl_idx * le16_to_cpu(msd->msd_client_size)); + med->med_lr_off = msd->msd_client_start + + (cl_idx * msd->msd_client_size); init_mutex(&med->med_mcd_lock); LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off); if (new_client) { loff_t off = med->med_lr_off; - rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL); + struct thandle *th; + th = mdt_trans_start(ctx, mdt); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th); CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n", cl_idx, med->med_lr_off, sizeof(*mcd)); + mdt_trans_stop(ctx, mdt, th); } RETURN(rc); } int mdt_client_free(const struct lu_context *ctx, - struct mdt_device *mdt, - struct mdt_export_data *med) + struct mdt_device *mdt, + struct mdt_export_data *med) { struct mdt_client_data *mcd = med->med_mcd; struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd; + struct thandle *th; loff_t off; int rc = 0; ENTRY; @@ -507,10 +684,16 @@ int mdt_client_free(const struct lu_context *ctx, LBUG(); } + th = mdt_trans_start(ctx, mdt); + if (IS_ERR(th)) + GOTO(free, rc = PTR_ERR(th)); + mutex_down(&med->med_mcd_lock); memset(mcd, 0, sizeof *mcd); - rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL); + + rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th); mutex_up(&med->med_mcd_lock); + mdt_trans_stop(ctx, mdt, th); CDEBUG(rc == 0 ? D_INFO : D_ERROR, "zeroing out client idx %u in %s rc %d\n", @@ -525,7 +708,7 @@ int mdt_client_free(const struct lu_context *ctx, /* Make sure the server's last_transno is up to date. Do this * after the client is freed so we know all the client's * transactions have been committed. */ - mdt_update_server_data(ctx, mdt); + mdt_server_data_update(ctx, mdt); EXIT; free: @@ -537,7 +720,7 @@ free: /* * last_rcvd & last_committed update callbacks */ -static int mdt_update_last_rcvd(struct mdt_thread_info *mti, +static int mdt_last_rcvd_update(struct mdt_thread_info *mti, struct thandle *th) { struct mdt_device *mdt = mti->mti_mdt; @@ -567,23 +750,23 @@ static int mdt_update_last_rcvd(struct mdt_thread_info *mti, off = med->med_lr_off; mutex_down(&med->med_mcd_lock); if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { - mcd->mcd_last_close_transno = cpu_to_le64(mti->mti_transno); - mcd->mcd_last_close_xid = cpu_to_le64(req->rq_xid); - mcd->mcd_last_close_result = cpu_to_le32(rc); + mcd->mcd_last_close_transno = mti->mti_transno; + mcd->mcd_last_close_xid = req->rq_xid; + mcd->mcd_last_close_result = rc; } else { - mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno); - mcd->mcd_last_xid = cpu_to_le64(req->rq_xid); - mcd->mcd_last_result = cpu_to_le32(rc); + mcd->mcd_last_transno = mti->mti_transno; + mcd->mcd_last_xid = req->rq_xid; + mcd->mcd_last_result = rc; /*XXX: save intent_disposition in mdt_thread_info? * also there is bug - intent_dispostion is __u64, * see struct ldlm_reply->lock_policy_res1; */ - mcd->mcd_last_data = cpu_to_le32(mti->mti_opdata); + mcd->mcd_last_data = mti->mti_opdata; } if (off <= 0) { CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off); err = -EINVAL; } else { - err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th); + err = mdt_last_rcvd_write(mti->mti_ctxt, mdt, mcd, &off, th); } mutex_up(&med->med_mcd_lock); RETURN(err); @@ -606,7 +789,7 @@ static int mdt_txn_start_cb(const struct lu_context *ctx, static inline __u64 req_exp_last_xid(struct ptlrpc_request *req) { - return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid); + return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid; } /* Update last_rcvd records with latests transaction data */ @@ -664,11 +847,11 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx, txi->txi_transno = mti->mti_transno; spin_unlock(&mdt->mdt_transno_lock); - return mdt_update_last_rcvd(mti, txn); + return mdt_last_rcvd_update(mti, txn); } /* commit callback, need to update last_commited value */ -static int mdt_txn_commit_cb(const struct lu_context *ctxt, +static int mdt_txn_commit_cb(const struct lu_context *ctx, struct thandle *txn, void *cookie) { struct mdt_device *mdt = cookie; @@ -711,7 +894,7 @@ int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt) LAST_RCVD, &last_fid); if(!IS_ERR(last)) { mdt->mdt_last_rcvd = last; - rc = mdt_init_server_data(ctx, mdt); + rc = mdt_server_data_init(ctx, mdt); if (rc) { lu_object_put(ctx, &last->do_lu); mdt->mdt_last_rcvd = NULL; -- 1.8.3.1