From: huanghua Date: Thu, 6 Jul 2006 07:15:26 +0000 (+0000) Subject: add prototype code for open, just passed compilation, will be revisioned later. X-Git-Tag: v1_8_0_110~486^2~1477 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=2a49bca6cf0be9224a25029a5c96c6c693f6cbf2;p=fs%2Flustre-release.git add prototype code for open, just passed compilation, will be revisioned later. add prototype code for handling last_rcvd. --- diff --git a/lustre/mdt/Makefile.in b/lustre/mdt/Makefile.in index 26c17e2..46168c8 100644 --- a/lustre/mdt/Makefile.in +++ b/lustre/mdt/Makefile.in @@ -1,4 +1,4 @@ MODULES := mdt -mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o +mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_fs.o mdt_open.o @INCLUDE_RULES@ diff --git a/lustre/mdt/mdt_fs.c b/lustre/mdt/mdt_fs.c new file mode 100644 index 0000000..239c578 --- /dev/null +++ b/lustre/mdt/mdt_fs.c @@ -0,0 +1,421 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * linux/mdt/mdt_open.c + * Lustre Metadata Target (mdt) open/close file handling + * + * Copyright (C) 2002-2006 Cluster File Systems, Inc. + * Author: Huang Hua + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_MDS + +#include "mdt_internal.h" + +/* Add client data to the MDS. We use a bitmap to locate a free space + * in the last_rcvd file if cl_off is -1 (i.e. a new client). + * Otherwise, we just have to read the data from the last_rcvd file and + * we know its offset. + * + * It should not be possible to fail adding an existing client - otherwise + * mdt_init_server_data() callsite needs to be fixed. + */ +int mdt_client_add(const struct lu_context *ctxt, + struct mdt_device *mdt, + struct mdt_export_data *med, + int cl_idx) +{ + unsigned long *bitmap = mdt->mdt_client_bitmap; + struct mdt_client_data *mcd = med->med_mcd; + struct mdt_server_data *msd = &mdt->mdt_msd; + int new_client = (cl_idx == -1); + ENTRY; + + LASSERT(bitmap != NULL); + LASSERTF(cl_idx > -2, "%d\n", cl_idx); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + if (new_client) { + cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); + repeat: + if (cl_idx >= LR_MAX_CLIENTS || + OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) { + CERROR("no room for clients - fix LR_MAX_CLIENTS\n"); + return -EOVERFLOW; + } + if (test_and_set_bit(cl_idx, bitmap)) { + cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, + cl_idx); + goto repeat; + } + } else { + if (test_and_set_bit(cl_idx, bitmap)) { + CERROR("MDS client %d: bit already set in bitmap!!\n", + cl_idx); + LBUG(); + } + } + + CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n", + cl_idx, med->med_mcd->mcd_uuid); + + med->med_lr_idx = cl_idx; + med->med_lr_off = le32_to_cpu(msd->msd_client_start) + + (cl_idx * le16_to_cpu(msd->msd_client_size)); + LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off); + + if (new_client) { + loff_t off = med->med_lr_off; + int rc = 0; + + rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, + mdt->mdt_last, + mcd, sizeof(*mcd), + &off); + + if (rc) + return rc; + CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n", + cl_idx, off, sizeof(mcd)); + } + return 0; +} + +int mdt_update_server_data(const struct lu_context *ctxt, + struct mdt_device *mdt, + int sync) +{ + struct mdt_server_data *msd = &mdt->mdt_msd; + loff_t off = 0; + int rc = 0; + ENTRY; + + CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", + mdt->mdt_mount_count, mdt->mdt_last_transno); + + msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno); + rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, + mdt->mdt_last, + msd, + sizeof(*msd), &off); + RETURN(rc); + +} + +int mdt_client_free(const struct lu_context *ctxt, + struct mdt_device *mdt, + struct mdt_export_data *med) +{ + struct mdt_client_data *mcd = med->med_mcd; + int rc; + loff_t off; + ENTRY; + + if (!mcd) + RETURN(0); + + CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n", + med->med_lr_idx, med->med_lr_off, mcd->mcd_uuid); + + off = med->med_lr_off; + + /* Don't clear med_lr_idx here as it is likely also unset. At worst + * we leak a client slot that will be cleaned on the next recovery. */ + if (off <= 0) { + CERROR("client idx %d has offset %lld\n", + med->med_lr_idx, off); + GOTO(free, rc = -EINVAL); + } + + /* Clear the bit _after_ zeroing out the client so we don't + race with mdt_client_add and zero out new clients.*/ + if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) { + CERROR("MDT client %u: bit already clear in bitmap!!\n", + med->med_lr_idx); + LBUG(); + } + +// if (!(exp->exp_flags & OBD_OPT_FAILOVER)) { + memset(&mcd, 0, sizeof *mcd); + rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, + mdt->mdt_last, + mcd, + sizeof(*mcd), &off); + CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR, + "zeroing out client idx %u in %s rc %d\n", + med->med_lr_idx, LAST_RCVD, rc); + + if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) { + CERROR("MDS client %u: bit already clear in bitmap!!\n", + med->med_lr_idx); + LBUG(); + } + + /* Make sure the server's last_transno is up to date. Do this + * after the client is freed so we know all the client's + * transactions have been committed. */ + mdt_update_server_data(ctxt, mdt, 0); + + EXIT; +free: + OBD_FREE_PTR(mcd); + med->med_mcd = NULL; + return 0; +} + +static int mdt_init_server_data(const struct lu_context *ctxt, + struct mdt_device *mdt) +{ + struct mdt_server_data *msd = &mdt->mdt_msd; + struct mdt_client_data *mcd = NULL; + struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd; + loff_t off = 0; + unsigned long last_rcvd_size = 0; // = getsize(mdt->mdt_last) + __u64 mount_count; + int cl_idx; + int rc = 0; + ENTRY; + + /* ensure padding in the struct is the correct size */ + LASSERT(offsetof(struct mdt_server_data, msd_padding) + + sizeof(msd->msd_padding) == LR_SERVER_SIZE); + LASSERT(offsetof(struct mdt_client_data, mcd_padding) + + sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE); + + if (last_rcvd_size == 0) { + LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name); + + memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid)); + msd->msd_last_transno = 0; + mount_count = msd->msd_mount_count = 0; + msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE); + msd->msd_client_start = cpu_to_le32(LR_CLIENT_START); + msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE); + msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID); + msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT | + OBD_INCOMPAT_COMMON_LR); + } else { + rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt, + mdt->mdt_last, + msd, + sizeof(*msd), &off); + if (rc) { + CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc); + GOTO(out, rc); + } + if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) { + LCONSOLE_ERROR("Trying to start OBD %s using the wrong" + " disk %s. Were the /dev/ assignments " + "rearranged?\n", + obd->obd_uuid.uuid, msd->msd_uuid); + GOTO(out, rc = -EINVAL); + } + mount_count = le64_to_cpu(msd->msd_mount_count); + } + + if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) { + CERROR("%s: unsupported incompat filesystem feature(s) %x\n", + obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) & + ~MDT_INCOMPAT_SUPP); + GOTO(out, rc = -EINVAL); + } + if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) { + CERROR("%s: unsupported read-only filesystem feature(s) %x\n", + obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) & + ~MDT_ROCOMPAT_SUPP); + /* Do something like remount filesystem read-only */ + GOTO(out, rc = -EINVAL); + } + if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){ + CDEBUG(D_WARNING, "using old last_rcvd format\n"); + msd->msd_mount_count = msd->msd_last_transno; + msd->msd_last_transno = msd->msd_unused; + /* If we update the last_rcvd, we can never go back to + an old install, so leave this in the old format for now. + msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR); + */ + } + msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT); + + mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno); + + CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n", + obd->obd_name, mdt->mdt_last_transno); + CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", + obd->obd_name, mount_count + 1); + CDEBUG(D_INODE, "%s: server data size: %u\n", + obd->obd_name, le32_to_cpu(msd->msd_server_size)); + CDEBUG(D_INODE, "%s: per-client data start: %u\n", + obd->obd_name, le32_to_cpu(msd->msd_client_start)); + CDEBUG(D_INODE, "%s: per-client data size: %u\n", + obd->obd_name, le32_to_cpu(msd->msd_client_size)); + CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n", + obd->obd_name, last_rcvd_size); + CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, + last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 : + (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) / + le16_to_cpu(msd->msd_client_size)); + + if (!msd->msd_server_size || !msd->msd_client_start || + !msd->msd_client_size) { + CERROR("Bad last_rcvd contents!\n"); + GOTO(out, rc = -EINVAL); + } + + /* When we do a clean MDS shutdown, we save the last_transno into + * the header. If we find clients with higher last_transno values + * then those clients may need recovery done. */ + for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start); + off < last_rcvd_size; cl_idx++) { + __u64 last_transno; + struct obd_export *exp; + struct mdt_export_data *med; + + if (!mcd) { + OBD_ALLOC_PTR(mcd); + if (!mcd) + GOTO(err_client, rc = -ENOMEM); + } + + off = le32_to_cpu(msd->msd_client_start) + + cl_idx * le16_to_cpu(msd->msd_client_size); + rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt, + mdt->mdt_last, + mcd, + sizeof(*mcd), &off); + if (rc) { + CERROR("error reading MDS %s idx %d, off %llu: rc %d\n", + LAST_RCVD, cl_idx, off, rc); + break; /* read error shouldn't cause startup to fail */ + } + + if (mcd->mcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_idx); + continue; + } + + last_transno = le64_to_cpu(mcd->mcd_last_transno); + + /* These exports are cleaned up by mdt_obd_disconnect(), so they + * need to be set up like real exports as mdt_obd_connect() does. + */ + CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx, + last_transno, le64_to_cpu(msd->msd_last_transno), + le64_to_cpu(mcd->mcd_last_xid)); + + exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid); + if (IS_ERR(exp)) + GOTO(err_client, rc = PTR_ERR(exp)); + + med = &exp->exp_mdt_data; + med->med_mcd = mcd; + rc = mdt_client_add(ctxt, mdt, med, cl_idx); + LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ + + mcd = NULL; + exp->exp_replay_needed = 1; + exp->exp_connecting = 0; + obd->obd_recoverable_clients++; + obd->obd_max_recoverable_clients++; + class_export_put(exp); + + CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n", + cl_idx, last_transno); + + if (last_transno > mdt->mdt_last_transno) + mdt->mdt_last_transno = last_transno; + } + + if (mcd) + OBD_FREE_PTR(mcd); + + obd->obd_last_committed = mdt->mdt_last_transno; + + if (obd->obd_recoverable_clients) { + CWARN("RECOVERY: service %s, %d recoverable clients, " + "last_transno "LPU64"\n", obd->obd_name, + obd->obd_recoverable_clients, mdt->mdt_last_transno); + obd->obd_next_recovery_transno = obd->obd_last_committed + 1; + obd->obd_recovering = 1; + obd->obd_recovery_start = CURRENT_SECONDS; + /* Only used for lprocfs_status */ + obd->obd_recovery_end = obd->obd_recovery_start + + OBD_RECOVERY_TIMEOUT; + } + + mdt->mdt_mount_count = mount_count + 1; + msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count); + + /* save it, so mount count and last_transno is current */ + rc = mdt_update_server_data(ctxt, mdt, 1); + if (rc) + GOTO(err_client, rc); + + RETURN(0); + +err_client: + class_disconnect_exports(obd); +out: + return rc; +} + +int mdt_fs_setup(const struct lu_context *ctxt, + struct mdt_device *mdt) +{ + struct lu_fid last_fid; + struct dt_object *last; + int rc; + ENTRY; + + last = dt_store_open(ctxt, mdt->mdt_bottom, LAST_RCVD, &last_fid); + if(!IS_ERR(last)) { + mdt->mdt_last = last; + rc = mdt_init_server_data(ctxt, mdt); + if (rc) { + lu_object_put(ctxt, &last->do_lu); + mdt->mdt_last = NULL; + } + } else { + CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); + rc = PTR_ERR(last); + } + return rc; +} + + +void mdt_fs_cleanup(const struct lu_context *ctxt, + struct mdt_device *mdt) +{ + struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd; + + class_disconnect_exports(obd); /* cleans up client info too */ + + if (mdt->mdt_last) + lu_object_put(ctxt, &mdt->mdt_last->do_lu); + mdt->mdt_last = NULL; +} + diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index b9a9bab..9536238 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -553,33 +553,6 @@ static int mdt_reint(struct mdt_thread_info *info) RETURN(rc); } -static int mdt_close(struct mdt_thread_info *info) -{ -#ifdef MDT_CODE - /* TODO: dual to open handling, orphan handling */ - struct mdt_body * reqbody; - struct mdt_body * repbody; - - reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY); - repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - -#endif - return -EOPNOTSUPP; -} - -static int mdt_done_writing(struct mdt_thread_info *info) -{ - return -EOPNOTSUPP; -} - -static int mdt_pin(struct mdt_thread_info *info) -{ -#ifdef MDT_CODE - /* TODO: This is open handling. */ -#endif - return -EOPNOTSUPP; -} - #ifdef MDT_CODE /* TODO these two methods not available now. */ @@ -805,16 +778,6 @@ struct mdt_object *mdt_object_find(const struct lu_context *ctxt, return mdt_obj(o); } -void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *o) -{ - lu_object_put(ctxt, &o->mot_obj.mo_lu); -} - -const struct lu_fid *mdt_object_fid(struct mdt_object *o) -{ - return lu_object_fid(&o->mot_obj.mo_lu); -} - int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits) { @@ -2055,6 +2018,8 @@ static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m) struct lu_site *ls = d->ld_site; ENTRY; + + mdt_fs_cleanup(ctx, m); mdt_stop_ptlrpc_service(m); @@ -2145,8 +2110,13 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m, if (rc) GOTO(err_free_ns, rc); - RETURN(0); + rc = mdt_fs_setup(ctx, m); + if (rc) + GOTO(err_stop_service, rc); + RETURN(0); +err_stop_service: + mdt_stop_ptlrpc_service(m); err_free_ns: ldlm_namespace_free(m->mdt_namespace, 0); m->mdt_namespace = NULL; @@ -2240,18 +2210,18 @@ static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o) EXIT; } -static int mdt_object_exists(const struct lu_context *ctx, - const struct lu_object *o) -{ - return lu_object_exists(ctx, lu_object_next(o)); -} - static int mdt_object_print(const struct lu_context *ctxt, struct seq_file *f, const struct lu_object *o) { return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o); } +int mdt_object_exists(const struct lu_context *ctx, + const struct lu_object *o) +{ + return lu_object_exists(ctx, lu_object_next(o)); +} + static struct lu_device_operations mdt_lu_ops = { .ldo_object_alloc = mdt_object_alloc, .ldo_process_config = mdt_process_config @@ -2376,6 +2346,64 @@ static int mdt_obd_disconnect(struct obd_export *exp) RETURN(rc); } +static int mdt_init_export(struct obd_export *exp) +{ + struct mdt_export_data *med = &exp->exp_mdt_data; + + INIT_LIST_HEAD(&med->med_open_head); + spin_lock_init(&med->med_open_lock); + exp->exp_connecting = 1; + RETURN(0); +} + +static int mdt_destroy_export(struct obd_export *export) +{ + struct mdt_export_data *med; + struct obd_device *obd = export->exp_obd; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct lu_context ctxt; + int rc = 0; + ENTRY; + + med = &export->exp_mdt_data; + target_destroy_export(export); + + if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) + RETURN(0); + + rc = lu_context_init(&ctxt); + if (rc) + RETURN(rc); + + lu_context_enter(&ctxt); + /* Close any open files (which may also cause orphan unlinking). */ + spin_lock(&med->med_open_lock); + while (!list_empty(&med->med_open_head)) { + struct list_head *tmp = med->med_open_head.next; + struct mdt_file_data *mfd = + list_entry(tmp, struct mdt_file_data, mfd_list); + + /* Remove mfd handle so it can't be found again. + * We are consuming the mfd_list reference here. */ + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); + spin_unlock(&med->med_open_lock); + + rc = mdt_mfd_close(&ctxt, mfd, 0); + + if (rc) + CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc); + spin_lock(&med->med_open_lock); + } + spin_unlock(&med->med_open_lock); + mdt_client_free(&ctxt, mdt, med); + + lu_context_exit(&ctxt); + lu_context_fini(&ctxt); + + RETURN(rc); +} + static int mdt_notify(struct obd_device *obd, struct obd_device *watched, enum obd_notify_event ev, void *data) { @@ -2402,10 +2430,12 @@ out: } static struct obd_ops mdt_obd_device_ops = { - .o_owner = THIS_MODULE, - .o_connect = mdt_obd_connect, - .o_disconnect = mdt_obd_disconnect, - .o_notify = mdt_notify, + .o_owner = THIS_MODULE, + .o_connect = mdt_obd_connect, + .o_disconnect = mdt_obd_disconnect, + .o_init_export = mdt_init_export, /* By Huang Hua*/ + .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/ + .o_notify = mdt_notify, }; static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d) diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 84e9c7a..0e83c69 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -10,6 +10,7 @@ * Author: Phil Schwan * Author: Mike Shaver * Author: Nikita Danilov + * Author: Huang Hua * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. @@ -46,12 +47,57 @@ */ #include #include +#include #include #include #include /* LR_CLIENT_SIZE, etc. */ #include + +/* Data stored per client in the last_rcvd file. In le32 order. */ +struct mdt_client_data { + __u8 mcd_uuid[40]; /* client UUID */ + __u64 mcd_last_transno; /* last completed transaction ID */ + __u64 mcd_last_xid; /* xid for the last transaction */ + __u32 mcd_last_result; /* result from last RPC */ + __u32 mcd_last_data; /* per-op data (disposition for open &c.) */ + __u8 mcd_padding[LR_CLIENT_SIZE - 64]; +}; + +/* copied from lr_server_data. + * mds data stored at the head of last_rcvd file. In le32 order. */ +struct mdt_server_data { + __u8 msd_uuid[40]; /* server UUID */ + __u64 msd_unused; /* was fsd_last_objid - don't use for now */ + __u64 msd_last_transno; /* last completed transaction ID */ + __u64 msd_mount_count; /* incarnation number */ + __u32 msd_feature_compat; /* compatible feature flags */ + __u32 msd_feature_rocompat;/* read-only compatible feature flags */ + __u32 msd_feature_incompat;/* incompatible feature flags */ + __u32 msd_server_size; /* size of server data area */ + __u32 msd_client_start; /* start of per-client data area */ + __u16 msd_client_size; /* size of per-client data area */ + __u16 msd_subdir_count; /* number of subdirectories for objects */ + __u64 msd_catalog_oid; /* recovery catalog object id */ + __u32 msd_catalog_ogen; /* recovery catalog inode generation */ + __u8 msd_peeruuid[40]; /* UUID of MDS associated with this OST */ + __u32 msd_ost_index; /* index number of OST in LOV */ + __u32 msd_mdt_index; /* index number of MDT in LMV */ + __u8 msd_padding[LR_SERVER_SIZE - 148]; +}; + +struct mdt_object; +/* file data for open files on MDS */ +struct mdt_file_data { + struct portals_handle mfd_handle; /* must be first */ + atomic_t mfd_refcount; + struct list_head mfd_list; /* protected by med_open_lock */ + __u64 mfd_xid; + int mfd_mode; + struct mdt_object *mfd_object; +}; + struct mdt_device { /* super-class */ struct md_device mdt_md_dev; @@ -83,24 +129,17 @@ struct mdt_device { * or should be placed somewhere else. */ int mdt_max_mdsize; int mdt_max_cookiesize; - + __u64 mdt_mount_count; + + struct mdt_server_data mdt_msd; + unsigned long mdt_client_bitmap[LR_MAX_CLIENTS / sizeof(long)]; + struct dt_object *mdt_last; }; -/* Data stored per client in the last_rcvd file. In le32 order. */ -struct mdt_client_data { - __u8 mcd_uuid[40]; /* client UUID */ - __u64 mcd_last_transno; /* last completed transaction ID */ - __u64 mcd_last_xid; /* xid for the last transaction */ - __u32 mcd_last_result; /* result from last RPC */ - __u32 mcd_last_data; /* per-op data (disposition for open &c.) */ - __u8 mcd_padding[LR_CLIENT_SIZE - 64]; -}; -#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000) -static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m) -{ - LASSERT(m->mdt_child); - return m->mdt_child->md_ops; -} +/*XXX copied from mds_internal.h */ +#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000) +#define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID) +#define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR) enum mdt_flags { /* @@ -115,22 +154,11 @@ struct mdt_object { struct md_object mot_obj; }; -static inline struct md_object *mdt_object_child(struct mdt_object *o) -{ - return lu2md(lu_object_next(&o->mot_obj.mo_lu)); -} - struct mdt_lock_handle { struct lustre_handle mlh_lh; ldlm_mode_t mlh_mode; }; -void mdt_lock_handle_init(struct mdt_lock_handle *lh); -void mdt_lock_handle_fini(struct mdt_lock_handle *lh); - -int md_device_init(struct md_device *md, struct lu_device_type *t); -void md_device_fini(struct md_device *md); - enum { MDT_REP_BUF_NR_MAX = 8 }; @@ -220,23 +248,51 @@ struct mdt_thread_info { struct mdt_reint_reply mti_reint_rep; }; -int fid_lock(struct ldlm_namespace *, const struct lu_fid *, - struct lustre_handle *, ldlm_mode_t, - ldlm_policy_data_t *); +static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m) +{ + LASSERT(m->mdt_child); + return m->mdt_child->md_ops; +} + +static inline struct md_object *mdt_object_child(struct mdt_object *o) +{ + return lu2md(lu_object_next(&o->mot_obj.mo_lu)); +} + +static inline struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info) +{ + return info->mti_pill.rc_req; +} + +static inline void mdt_object_get(const struct lu_context *ctxt, + struct mdt_object *o) +{ + lu_object_get(&o->mot_obj.mo_lu); +} -void fid_unlock(struct ldlm_namespace *, const struct lu_fid *, - struct lustre_handle *, ldlm_mode_t); +static inline void mdt_object_put(const struct lu_context *ctxt, + struct mdt_object *o) +{ + lu_object_put(ctxt, &o->mot_obj.mo_lu); +} -struct mdt_object *mdt_object_find(const struct lu_context *, - struct mdt_device *, const struct lu_fid *); -void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *); +static inline const struct lu_fid *mdt_object_fid(struct mdt_object *o) +{ + return lu_object_fid(&o->mot_obj.mo_lu); +} -int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *, - struct mdt_lock_handle *, __u64); +int mdt_object_lock(struct ldlm_namespace *, + struct mdt_object *, + struct mdt_lock_handle *, + __u64); -void mdt_object_unlock(struct ldlm_namespace *, struct mdt_object *, +void mdt_object_unlock(struct ldlm_namespace *, + struct mdt_object *, struct mdt_lock_handle *); +struct mdt_object *mdt_object_find(const struct lu_context *, + struct mdt_device *, + const struct lu_fid *); struct mdt_object *mdt_object_find_lock(const struct lu_context *, struct mdt_device *, const struct lu_fid *, @@ -245,13 +301,53 @@ struct mdt_object *mdt_object_find_lock(const struct lu_context *, int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op); int mdt_reint_rec(struct mdt_thread_info *); void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr); -const struct lu_fid *mdt_object_fid(struct mdt_object *o); -static inline struct ptlrpc_request *mdt_info_req (struct mdt_thread_info *info) -{ - return info->mti_pill.rc_req; -} + int mdt_getxattr(struct mdt_thread_info *info); int mdt_setxattr(struct mdt_thread_info *info); +void mdt_lock_handle_init(struct mdt_lock_handle *lh); +void mdt_lock_handle_fini(struct mdt_lock_handle *lh); + + +int mdt_object_exists(const struct lu_context *ctx, + const struct lu_object *o); + +int mdt_fs_setup(const struct lu_context *ctxt, + struct mdt_device *mdt); + +void mdt_fs_cleanup(const struct lu_context *ctxt, + struct mdt_device *mdt); + +int mdt_client_free(const struct lu_context *ctxt, + struct mdt_device *mdt, + struct mdt_export_data *med); + +int mdt_update_server_data(const struct lu_context *ctxt, + struct mdt_device *mdt, + int sync); + +int mdt_client_add(const struct lu_context *ctxt, + struct mdt_device *mdt, + struct mdt_export_data *med, + int cl_idx); + +int mdt_pin(struct mdt_thread_info* info); + +int mdt_lock_new_child(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *child_lockh); + +int mdt_reint_open(struct mdt_thread_info *info); + +int mdt_mfd_close(const struct lu_context *ctxt, + struct mdt_file_data *mfd, + int unlink_orphan); + +int mdt_close(struct mdt_thread_info *info); + +int mdt_done_writing(struct mdt_thread_info *info); + + + #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c new file mode 100644 index 0000000..9cff0e5 --- /dev/null +++ b/lustre/mdt/mdt_open.c @@ -0,0 +1,377 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * linux/mdt/mdt_open.c + * Lustre Metadata Target (mdt) open/close file handling + * + * Copyright (C) 2002-2006 Cluster File Systems, Inc. + * Author: Huang Hua + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_MDS + +#include "mdt_internal.h" + +/* + * MDS file data handling: file data holds a handle for a file opened + * by a client. + */ + +static void mdt_mfd_get(void *mfdp) +{ + struct mdt_file_data *mfd = mfdp; + + atomic_inc(&mfd->mfd_refcount); + CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd, + atomic_read(&mfd->mfd_refcount)); +} + +/* Create a new mdt_file_data struct. + * reference is set to 1 */ +static struct mdt_file_data *mdt_mfd_new(void) +{ + struct mdt_file_data *mfd; + + OBD_ALLOC_PTR(mfd); + if (mfd == NULL) { + CERROR("mds: out of memory\n"); + return NULL; + } + + atomic_set(&mfd->mfd_refcount, 1); + + INIT_LIST_HEAD(&mfd->mfd_handle.h_link); + INIT_LIST_HEAD(&mfd->mfd_list); + class_handle_hash(&mfd->mfd_handle, mdt_mfd_get); + + return mfd; +} + +/* Get a new reference on the mfd pointed to by handle, if handle is still + * valid. Caller must drop reference with mdt_mfd_put(). */ +static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle) +{ + ENTRY; + LASSERT(handle != NULL); + RETURN(class_handle2object(handle->cookie)); +} + +/* Drop mfd reference, freeing struct if this is the last one. */ +static void mdt_mfd_put(struct mdt_file_data *mfd) +{ + CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd, + atomic_read(&mfd->mfd_refcount) - 1); + LASSERT(atomic_read(&mfd->mfd_refcount) > 0 && + atomic_read(&mfd->mfd_refcount) < 0x5a5a); + if (atomic_dec_and_test(&mfd->mfd_refcount)) { + LASSERT(list_empty(&mfd->mfd_handle.h_link)); + OBD_FREE_PTR(mfd); + } +} + +static int mdt_object_open(struct mdt_thread_info *info, + struct mdt_object *o, + int flags) +{ + struct mdt_export_data *med; + struct mdt_file_data *mfd; + struct mdt_body *repbody; + struct lov_mds_md *lmm; + int rc = 0; + ENTRY; + + med = &mdt_info_req(info)->rq_export->exp_mdt_data; + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD); + + rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), + &info->mti_attr); + if (rc != 0) + GOTO(out, rc); + + mdt_pack_attr2body(repbody, &info->mti_attr); + repbody->fid1 = *mdt_object_fid(o); + repbody->valid |= OBD_MD_FLID; + + rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o), + lmm, info->mti_mdt->mdt_max_mdsize, "lov"); + if (rc < 0) + GOTO(out, rc = -EINVAL); + + if (S_ISDIR(info->mti_attr.la_mode)) + repbody->valid |= OBD_MD_FLDIREA; + else + repbody->valid |= OBD_MD_FLEASIZE; + repbody->eadatasize = rc; + rc = 0; + + mfd = mdt_mfd_new(); + if (mfd == NULL) { + CERROR("mds: out of memory\n"); + GOTO(out, rc = -ENOMEM); + } + + if (flags & FMODE_WRITE) { + /*mds_get_write_access*/ + } else if (flags & MDS_FMODE_EXEC) { + /*mds_deny_write_access*/ + } + + /* keep a reference on this object for this open, + * and is released by mdt_mfd_close() */ + mdt_object_get(info->mti_ctxt, o); + + mfd->mfd_mode = flags; + mfd->mfd_object = o; + mfd->mfd_xid = mdt_info_req(info)->rq_xid; + + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + + repbody->handle.cookie = mfd->mfd_handle.h_cookie; + + RETURN(rc); +out: + return rc; +} + +int mdt_pin(struct mdt_thread_info* info) +{ + struct mdt_object *o; + int rc; + ENTRY; + + o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1); + if (!IS_ERR(o)) { + if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) { + rc = mdt_object_open(info, o, info->mti_body->flags); + mdt_object_put(info->mti_ctxt, o); + } else + rc = -ENOENT; + } else + rc = PTR_ERR(o); + + RETURN(rc); +} + +/* Get an internal lock on the inode number (but not generation) to sync + * new inode creation with inode unlink (bug 2029). If child_lockh is NULL + * we just get the lock as a barrier to wait for other holders of this lock, + * and drop it right away again. */ +int mdt_lock_new_child(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *child_lockh) +{ + struct mdt_lock_handle lockh; + int rc; + + if (child_lockh == NULL) + child_lockh = &lockh; + + lockh.mlh_mode = LCK_EX; + rc = mdt_object_lock(info->mti_mdt->mdt_namespace, + o, &lockh, MDS_INODELOCK_UPDATE); + + if (rc != ELDLM_OK) + CERROR("can not mdt_object_lock: %d\n", rc); + else if (child_lockh == &lockh) + mdt_object_unlock(info->mti_mdt->mdt_namespace, + o, &lockh); + + RETURN(rc); +} + +int mdt_reint_open(struct mdt_thread_info *info) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *parent; + struct mdt_object *child; + struct mdt_lock_handle *lh; + struct ldlm_reply *ldlm_rep; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_body *body; + struct lu_fid child_fid; + int result; + int created = 0; + struct mdt_reint_record *rr = &info->mti_rr; + ENTRY; + + /* we now have no resent message, so it must be an intent */ + LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN); + + /*TODO: MDS_CHECK_RESENT */; + + ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); + body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + + lh = &info->mti_lh[MDT_LH_PARENT]; + lh->mlh_mode = LCK_PW; + parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1, + lh, MDS_INODELOCK_UPDATE); + if (IS_ERR(parent)) + GOTO(out, result = PTR_ERR(parent)); + + result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent), + rr->rr_name, &child_fid); + if (result && result != -ENOENT) { + GOTO(out_parent, result); + } + + intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD); + + if (result == -ENOENT) { + intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG); + if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT)) + GOTO(out_parent, result); + if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) + GOTO(out_parent, result = -EROFS); + child_fid = *info->mti_rr.rr_fid2; + } else { + intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); + if (info->mti_attr.la_flags & MDS_OPEN_EXCL && + info->mti_attr.la_flags & MDS_OPEN_CREAT) + GOTO(out_parent, result = -EEXIST); + + } + + child = mdt_object_find(info->mti_ctxt, mdt, &child_fid); + if (IS_ERR(child)) + GOTO(out_parent, result = PTR_ERR(child)); + + if (result == -ENOENT) { + /* not found and with MDS_OPEN_CREAT: let's create something */ + result = mdo_create(info->mti_ctxt, + mdt_object_child(parent), + rr->rr_name, + mdt_object_child(child), + &info->mti_attr); + intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE); + if (result != 0) + GOTO(out_child, result); + created = 1; + } else + intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN); + + /* Open it now. */ + result = mdt_object_open(info, child, info->mti_attr.la_flags); + GOTO(destroy_child, result); + +destroy_child: + if (result != 0 && created) { + mdo_unlink(info->mti_ctxt, mdt_object_child(parent), + mdt_object_child(child), rr->rr_name); + } else if (created) { + mdt_lock_new_child(info, child, NULL); + } +out_child: + mdt_object_put(info->mti_ctxt, child); +out_parent: + mdt_object_unlock(mdt->mdt_namespace, parent, lh); + mdt_object_put(info->mti_ctxt, parent); +out: + return result; +} + +int mdt_mfd_close(const struct lu_context *ctxt, + struct mdt_file_data *mfd, + int unlink_orphan) +{ + ENTRY; + + if (mfd->mfd_mode & FMODE_WRITE) { + /*mdt_put_write_access*/ + } else if (mfd->mfd_mode & MDS_FMODE_EXEC) { + /*mdt_allow_write_access*/ + } + + /* release reference on this object. + * it will be destroyed by lower layer if necessary. + */ + mdt_object_put(ctxt, mfd->mfd_object); + + mdt_mfd_put(mfd); + RETURN(0); +} + +int mdt_close(struct mdt_thread_info *info) +{ + struct mdt_export_data *med; + struct mdt_body *repbody; + struct mdt_file_data *mfd; + struct mdt_object *o; + struct lov_mds_md *lmm; + int rc; + ENTRY; + + med = &mdt_info_req(info)->rq_export->exp_mdt_data; + LASSERT(med); + + spin_lock(&med->med_open_lock); + mfd = mdt_handle2mfd(&(info->mti_body->handle)); + if (mfd == NULL) { + spin_unlock(&med->med_open_lock); + CDEBUG(D_INODE, "no handle for file close ino "DFID3 + ": cookie "LPX64, PFID3(&info->mti_body->fid1), + info->mti_body->handle.cookie); + RETURN(-ESTALE); + } + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); + spin_unlock(&med->med_open_lock); + + o = mfd->mfd_object; + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD); + + rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), + &info->mti_attr); + if (rc == 0) { + mdt_pack_attr2body(repbody, &info->mti_attr); + repbody->fid1 = *mdt_object_fid(o); + repbody->valid |= OBD_MD_FLID; + + rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o), + lmm, info->mti_mdt->mdt_max_mdsize, "lov"); + if (rc >= 0) { + if (S_ISDIR(info->mti_attr.la_mode)) + repbody->valid |= OBD_MD_FLDIREA; + else + repbody->valid |= OBD_MD_FLEASIZE; + repbody->eadatasize = rc; + rc = 0; + } + } + + rc = mdt_mfd_close(info->mti_ctxt, mfd, 1); + + RETURN(rc); +} + +int mdt_done_writing(struct mdt_thread_info *info) +{ + ENTRY; + + RETURN(0); +} diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 1330dcb..79b98bc 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -37,13 +37,6 @@ #include "mdt_internal.h" -/* object operations */ -static int mdt_md_open(struct mdt_thread_info *info, struct mdt_object *child) -{ - ENTRY; - RETURN(mo_open(info->mti_ctxt, mdt_object_child(child))); -} - static int mdt_md_create(struct mdt_thread_info *info) { struct mdt_device *mdt = info->mti_mdt; @@ -653,131 +646,6 @@ out: RETURN(-EOPNOTSUPP); } -static int mdt_reint_open(struct mdt_thread_info *info) -{ - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *parent; - struct mdt_object *child; - struct mdt_lock_handle *lh; - int result; - struct ldlm_reply *ldlm_rep; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *body = info->mti_reint_rep.mrr_body; - struct lov_mds_md *lmm = info->mti_reint_rep.mrr_md; - struct mdt_reint_record *rr = &info->mti_rr; - int created = 0; - struct lu_fid child_fid; - ENTRY; - - lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_mode = LCK_PW; - - //req_capsule_pack(&info->mti_pill); - ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); - body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - - parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1, - lh, MDS_INODELOCK_UPDATE); - if (IS_ERR(parent)) - RETURN(PTR_ERR(parent)); - - result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent), - rr->rr_name, &child_fid); - if (result && result != -ENOENT) { - GOTO(out_parent, result); - } - - intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD); - - if (result == -ENOENT) { - intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG); - if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT)) - GOTO(out_parent, result); - if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) - GOTO(out_parent, result = -EROFS); - child_fid = *info->mti_rr.rr_fid2; - } else { - intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); - if (info->mti_attr.la_flags & MDS_OPEN_EXCL && - info->mti_attr.la_flags & MDS_OPEN_CREAT) - GOTO(out_parent, result = -EEXIST); - - } - child = mdt_object_find(info->mti_ctxt, mdt, &child_fid); - if (IS_ERR(child)) - GOTO(out_parent, result = PTR_ERR(child)); - - if (result == -ENOENT) { - /* not found and with MDS_OPEN_CREAT: let's create something */ - result = mdo_create(info->mti_ctxt, - mdt_object_child(parent), - rr->rr_name, - mdt_object_child(child), - &info->mti_attr); - intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE); - if (result != 0) - GOTO(out_child, result); - created = 1; - } else { - result = mo_attr_get(info->mti_ctxt, - mdt_object_child(child), - &info->mti_attr); - if (result != 0) - GOTO(destroy_child, result); - } - - mdt_pack_attr2body(body, &info->mti_attr); - body->fid1 = *mdt_object_fid(child); - body->valid |= OBD_MD_FLID; - - /* we should return "struct lov_mds_md" back*/ - lmm = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_MD); - - /* not supported yet in MDD - result = mo_xattr_get(info->mti_ctxt, mdt_object_child(child), - lmm, MAX_MD_SIZE, "lov"); - if (result < 0) - GOTO(destroy_child, result = -EINVAL); - */ - if (S_ISDIR(info->mti_attr.la_mode)) - body->valid |= OBD_MD_FLDIREA; - else - body->valid |= OBD_MD_FLEASIZE; - body->eadatasize = result; - result = 0; - - /* FIXME Let me fake it until the underlying works */ - lmm->lmm_magic = LOV_MAGIC; /* magic number = LOV_MAGIC_V1 */ - lmm->lmm_pattern = LOV_PATTERN_RAID0; /* LOV_PATTERN_RAID0, LOV_PATTERN_RAID1 */ - lmm->lmm_object_id = 1; /* LOV object ID */ - lmm->lmm_object_gr = 1; /* LOV object group */ - lmm->lmm_stripe_size = 4096 * 1024; /* size of stripe in bytes */ - lmm->lmm_stripe_count = 1; /* num stripes in use for this object */ - /* per-stripe data */ - lmm->lmm_objects[0].l_object_id = 1; /* OST object ID */ - lmm->lmm_objects[0].l_object_gr = 1; /* OST object group (creating MDS number) */ - lmm->lmm_objects[0].l_ost_gen = 1; /* generation of this l_ost_idx */ - lmm->lmm_objects[0].l_ost_idx = 0; /* OST index in LOV (lov_tgt_desc->tgts) */ - body->eadatasize = sizeof(struct lov_mds_md) + sizeof(struct lov_ost_data); - - /* Open it now. */ - /* TODO: not supported yet - result = mdt_md_open(info, child); - */ - -destroy_child: - if (result != 0 && created) - mdo_unlink(info->mti_ctxt, mdt_object_child(parent), - mdt_object_child(child), rr->rr_name); -out_child: - mdt_object_put(info->mti_ctxt, child); -out_parent: - mdt_object_unlock(mdt->mdt_namespace, parent, lh); - mdt_object_put(info->mti_ctxt, parent); - RETURN(result); -} - typedef int (*mdt_reinter)(struct mdt_thread_info *info);