add prototype code for handling last_rcvd.
MODULES := mdt
-mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o
+mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_fs.o mdt_open.o
@INCLUDE_RULES@
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/mdt/mdt_open.c
+ * Lustre Metadata Target (mdt) open/close file handling
+ *
+ * Copyright (C) 2002-2006 Cluster File Systems, Inc.
+ * Author: Huang Hua <huanghua@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/* Add client data to the MDS. We use a bitmap to locate a free space
+ * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * Otherwise, we just have to read the data from the last_rcvd file and
+ * we know its offset.
+ *
+ * It should not be possible to fail adding an existing client - otherwise
+ * mdt_init_server_data() callsite needs to be fixed.
+ */
+int mdt_client_add(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ struct mdt_export_data *med,
+ int cl_idx)
+{
+ unsigned long *bitmap = mdt->mdt_client_bitmap;
+ struct mdt_client_data *mcd = med->med_mcd;
+ struct mdt_server_data *msd = &mdt->mdt_msd;
+ int new_client = (cl_idx == -1);
+ ENTRY;
+
+ LASSERT(bitmap != NULL);
+ LASSERTF(cl_idx > -2, "%d\n", cl_idx);
+
+ /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
+ * there's no need for extra complication here
+ */
+ if (new_client) {
+ cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
+ repeat:
+ if (cl_idx >= LR_MAX_CLIENTS ||
+ OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
+ CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
+ return -EOVERFLOW;
+ }
+ if (test_and_set_bit(cl_idx, bitmap)) {
+ cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
+ cl_idx);
+ goto repeat;
+ }
+ } else {
+ if (test_and_set_bit(cl_idx, bitmap)) {
+ CERROR("MDS client %d: bit already set in bitmap!!\n",
+ cl_idx);
+ LBUG();
+ }
+ }
+
+ CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
+ cl_idx, med->med_mcd->mcd_uuid);
+
+ med->med_lr_idx = cl_idx;
+ med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
+ (cl_idx * le16_to_cpu(msd->msd_client_size));
+ LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+
+ if (new_client) {
+ loff_t off = med->med_lr_off;
+ int rc = 0;
+
+ rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt,
+ mdt->mdt_last,
+ mcd, sizeof(*mcd),
+ &off);
+
+ if (rc)
+ return rc;
+ CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
+ cl_idx, off, sizeof(mcd));
+ }
+ return 0;
+}
+
+int mdt_update_server_data(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ int sync)
+{
+ struct mdt_server_data *msd = &mdt->mdt_msd;
+ loff_t off = 0;
+ int rc = 0;
+ ENTRY;
+
+ CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
+ mdt->mdt_mount_count, mdt->mdt_last_transno);
+
+ msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
+ rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt,
+ mdt->mdt_last,
+ msd,
+ sizeof(*msd), &off);
+ RETURN(rc);
+
+}
+
+int mdt_client_free(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ struct mdt_export_data *med)
+{
+ struct mdt_client_data *mcd = med->med_mcd;
+ int rc;
+ loff_t off;
+ ENTRY;
+
+ if (!mcd)
+ RETURN(0);
+
+ CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
+ med->med_lr_idx, med->med_lr_off, mcd->mcd_uuid);
+
+ off = med->med_lr_off;
+
+ /* Don't clear med_lr_idx here as it is likely also unset. At worst
+ * we leak a client slot that will be cleaned on the next recovery. */
+ if (off <= 0) {
+ CERROR("client idx %d has offset %lld\n",
+ med->med_lr_idx, off);
+ GOTO(free, rc = -EINVAL);
+ }
+
+ /* Clear the bit _after_ zeroing out the client so we don't
+ race with mdt_client_add and zero out new clients.*/
+ if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+ CERROR("MDT client %u: bit already clear in bitmap!!\n",
+ med->med_lr_idx);
+ LBUG();
+ }
+
+// if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
+ memset(&mcd, 0, sizeof *mcd);
+ rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt,
+ mdt->mdt_last,
+ mcd,
+ sizeof(*mcd), &off);
+ CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR,
+ "zeroing out client idx %u in %s rc %d\n",
+ med->med_lr_idx, LAST_RCVD, rc);
+
+ if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+ CERROR("MDS client %u: bit already clear in bitmap!!\n",
+ med->med_lr_idx);
+ LBUG();
+ }
+
+ /* Make sure the server's last_transno is up to date. Do this
+ * after the client is freed so we know all the client's
+ * transactions have been committed. */
+ mdt_update_server_data(ctxt, mdt, 0);
+
+ EXIT;
+free:
+ OBD_FREE_PTR(mcd);
+ med->med_mcd = NULL;
+ return 0;
+}
+
+static int mdt_init_server_data(const struct lu_context *ctxt,
+ struct mdt_device *mdt)
+{
+ struct mdt_server_data *msd = &mdt->mdt_msd;
+ struct mdt_client_data *mcd = NULL;
+ struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+ loff_t off = 0;
+ unsigned long last_rcvd_size = 0; // = getsize(mdt->mdt_last)
+ __u64 mount_count;
+ int cl_idx;
+ int rc = 0;
+ ENTRY;
+
+ /* ensure padding in the struct is the correct size */
+ LASSERT(offsetof(struct mdt_server_data, msd_padding) +
+ sizeof(msd->msd_padding) == LR_SERVER_SIZE);
+ LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
+ sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
+
+ if (last_rcvd_size == 0) {
+ LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
+
+ memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
+ msd->msd_last_transno = 0;
+ mount_count = msd->msd_mount_count = 0;
+ msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
+ msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
+ msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
+ msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
+ msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
+ OBD_INCOMPAT_COMMON_LR);
+ } else {
+ rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt,
+ mdt->mdt_last,
+ msd,
+ sizeof(*msd), &off);
+ if (rc) {
+ CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
+ GOTO(out, rc);
+ }
+ if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
+ LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
+ " disk %s. Were the /dev/ assignments "
+ "rearranged?\n",
+ obd->obd_uuid.uuid, msd->msd_uuid);
+ GOTO(out, rc = -EINVAL);
+ }
+ mount_count = le64_to_cpu(msd->msd_mount_count);
+ }
+
+ if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
+ CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+ obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
+ ~MDT_INCOMPAT_SUPP);
+ GOTO(out, rc = -EINVAL);
+ }
+ if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
+ CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+ obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
+ ~MDT_ROCOMPAT_SUPP);
+ /* Do something like remount filesystem read-only */
+ GOTO(out, rc = -EINVAL);
+ }
+ if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
+ CDEBUG(D_WARNING, "using old last_rcvd format\n");
+ msd->msd_mount_count = msd->msd_last_transno;
+ msd->msd_last_transno = msd->msd_unused;
+ /* If we update the last_rcvd, we can never go back to
+ an old install, so leave this in the old format for now.
+ msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
+ */
+ }
+ msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
+
+ mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
+
+ CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+ obd->obd_name, mdt->mdt_last_transno);
+ CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+ obd->obd_name, mount_count + 1);
+ CDEBUG(D_INODE, "%s: server data size: %u\n",
+ obd->obd_name, le32_to_cpu(msd->msd_server_size));
+ CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+ obd->obd_name, le32_to_cpu(msd->msd_client_start));
+ CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+ obd->obd_name, le32_to_cpu(msd->msd_client_size));
+ CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+ obd->obd_name, last_rcvd_size);
+ CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+ last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
+ (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
+ le16_to_cpu(msd->msd_client_size));
+
+ if (!msd->msd_server_size || !msd->msd_client_start ||
+ !msd->msd_client_size) {
+ CERROR("Bad last_rcvd contents!\n");
+ GOTO(out, rc = -EINVAL);
+ }
+
+ /* When we do a clean MDS shutdown, we save the last_transno into
+ * the header. If we find clients with higher last_transno values
+ * then those clients may need recovery done. */
+ for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
+ off < last_rcvd_size; cl_idx++) {
+ __u64 last_transno;
+ struct obd_export *exp;
+ struct mdt_export_data *med;
+
+ if (!mcd) {
+ OBD_ALLOC_PTR(mcd);
+ if (!mcd)
+ GOTO(err_client, rc = -ENOMEM);
+ }
+
+ off = le32_to_cpu(msd->msd_client_start) +
+ cl_idx * le16_to_cpu(msd->msd_client_size);
+ rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt,
+ mdt->mdt_last,
+ mcd,
+ sizeof(*mcd), &off);
+ if (rc) {
+ CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
+ LAST_RCVD, cl_idx, off, rc);
+ break; /* read error shouldn't cause startup to fail */
+ }
+
+ if (mcd->mcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_idx);
+ continue;
+ }
+
+ last_transno = le64_to_cpu(mcd->mcd_last_transno);
+
+ /* These exports are cleaned up by mdt_obd_disconnect(), so they
+ * need to be set up like real exports as mdt_obd_connect() does.
+ */
+ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+ " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
+ last_transno, le64_to_cpu(msd->msd_last_transno),
+ le64_to_cpu(mcd->mcd_last_xid));
+
+ exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
+ if (IS_ERR(exp))
+ GOTO(err_client, rc = PTR_ERR(exp));
+
+ med = &exp->exp_mdt_data;
+ med->med_mcd = mcd;
+ rc = mdt_client_add(ctxt, mdt, med, cl_idx);
+ LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+
+ mcd = NULL;
+ exp->exp_replay_needed = 1;
+ exp->exp_connecting = 0;
+ obd->obd_recoverable_clients++;
+ obd->obd_max_recoverable_clients++;
+ class_export_put(exp);
+
+ CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+ cl_idx, last_transno);
+
+ if (last_transno > mdt->mdt_last_transno)
+ mdt->mdt_last_transno = last_transno;
+ }
+
+ if (mcd)
+ OBD_FREE_PTR(mcd);
+
+ obd->obd_last_committed = mdt->mdt_last_transno;
+
+ if (obd->obd_recoverable_clients) {
+ CWARN("RECOVERY: service %s, %d recoverable clients, "
+ "last_transno "LPU64"\n", obd->obd_name,
+ obd->obd_recoverable_clients, mdt->mdt_last_transno);
+ obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+ obd->obd_recovering = 1;
+ obd->obd_recovery_start = CURRENT_SECONDS;
+ /* Only used for lprocfs_status */
+ obd->obd_recovery_end = obd->obd_recovery_start +
+ OBD_RECOVERY_TIMEOUT;
+ }
+
+ mdt->mdt_mount_count = mount_count + 1;
+ msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
+
+ /* save it, so mount count and last_transno is current */
+ rc = mdt_update_server_data(ctxt, mdt, 1);
+ if (rc)
+ GOTO(err_client, rc);
+
+ RETURN(0);
+
+err_client:
+ class_disconnect_exports(obd);
+out:
+ return rc;
+}
+
+int mdt_fs_setup(const struct lu_context *ctxt,
+ struct mdt_device *mdt)
+{
+ struct lu_fid last_fid;
+ struct dt_object *last;
+ int rc;
+ ENTRY;
+
+ last = dt_store_open(ctxt, mdt->mdt_bottom, LAST_RCVD, &last_fid);
+ if(!IS_ERR(last)) {
+ mdt->mdt_last = last;
+ rc = mdt_init_server_data(ctxt, mdt);
+ if (rc) {
+ lu_object_put(ctxt, &last->do_lu);
+ mdt->mdt_last = NULL;
+ }
+ } else {
+ CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+ rc = PTR_ERR(last);
+ }
+ return rc;
+}
+
+
+void mdt_fs_cleanup(const struct lu_context *ctxt,
+ struct mdt_device *mdt)
+{
+ struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+
+ class_disconnect_exports(obd); /* cleans up client info too */
+
+ if (mdt->mdt_last)
+ lu_object_put(ctxt, &mdt->mdt_last->do_lu);
+ mdt->mdt_last = NULL;
+}
+
RETURN(rc);
}
-static int mdt_close(struct mdt_thread_info *info)
-{
-#ifdef MDT_CODE
- /* TODO: dual to open handling, orphan handling */
- struct mdt_body * reqbody;
- struct mdt_body * repbody;
-
- reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-
-#endif
- return -EOPNOTSUPP;
-}
-
-static int mdt_done_writing(struct mdt_thread_info *info)
-{
- return -EOPNOTSUPP;
-}
-
-static int mdt_pin(struct mdt_thread_info *info)
-{
-#ifdef MDT_CODE
- /* TODO: This is open handling. */
-#endif
- return -EOPNOTSUPP;
-}
-
#ifdef MDT_CODE
/* TODO these two methods not available now. */
return mdt_obj(o);
}
-void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *o)
-{
- lu_object_put(ctxt, &o->mot_obj.mo_lu);
-}
-
-const struct lu_fid *mdt_object_fid(struct mdt_object *o)
-{
- return lu_object_fid(&o->mot_obj.mo_lu);
-}
-
int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits)
{
struct lu_site *ls = d->ld_site;
ENTRY;
+
+ mdt_fs_cleanup(ctx, m);
mdt_stop_ptlrpc_service(m);
if (rc)
GOTO(err_free_ns, rc);
- RETURN(0);
+ rc = mdt_fs_setup(ctx, m);
+ if (rc)
+ GOTO(err_stop_service, rc);
+ RETURN(0);
+err_stop_service:
+ mdt_stop_ptlrpc_service(m);
err_free_ns:
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
EXIT;
}
-static int mdt_object_exists(const struct lu_context *ctx,
- const struct lu_object *o)
-{
- return lu_object_exists(ctx, lu_object_next(o));
-}
-
static int mdt_object_print(const struct lu_context *ctxt,
struct seq_file *f, const struct lu_object *o)
{
return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
}
+int mdt_object_exists(const struct lu_context *ctx,
+ const struct lu_object *o)
+{
+ return lu_object_exists(ctx, lu_object_next(o));
+}
+
static struct lu_device_operations mdt_lu_ops = {
.ldo_object_alloc = mdt_object_alloc,
.ldo_process_config = mdt_process_config
RETURN(rc);
}
+static int mdt_init_export(struct obd_export *exp)
+{
+ struct mdt_export_data *med = &exp->exp_mdt_data;
+
+ INIT_LIST_HEAD(&med->med_open_head);
+ spin_lock_init(&med->med_open_lock);
+ exp->exp_connecting = 1;
+ RETURN(0);
+}
+
+static int mdt_destroy_export(struct obd_export *export)
+{
+ struct mdt_export_data *med;
+ struct obd_device *obd = export->exp_obd;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ struct lu_context ctxt;
+ int rc = 0;
+ ENTRY;
+
+ med = &export->exp_mdt_data;
+ target_destroy_export(export);
+
+ if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
+ RETURN(0);
+
+ rc = lu_context_init(&ctxt);
+ if (rc)
+ RETURN(rc);
+
+ lu_context_enter(&ctxt);
+ /* Close any open files (which may also cause orphan unlinking). */
+ spin_lock(&med->med_open_lock);
+ while (!list_empty(&med->med_open_head)) {
+ struct list_head *tmp = med->med_open_head.next;
+ struct mdt_file_data *mfd =
+ list_entry(tmp, struct mdt_file_data, mfd_list);
+
+ /* Remove mfd handle so it can't be found again.
+ * We are consuming the mfd_list reference here. */
+ class_handle_unhash(&mfd->mfd_handle);
+ list_del_init(&mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
+
+ rc = mdt_mfd_close(&ctxt, mfd, 0);
+
+ if (rc)
+ CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
+ spin_lock(&med->med_open_lock);
+ }
+ spin_unlock(&med->med_open_lock);
+ mdt_client_free(&ctxt, mdt, med);
+
+ lu_context_exit(&ctxt);
+ lu_context_fini(&ctxt);
+
+ RETURN(rc);
+}
+
static int mdt_notify(struct obd_device *obd, struct obd_device *watched,
enum obd_notify_event ev, void *data)
{
}
static struct obd_ops mdt_obd_device_ops = {
- .o_owner = THIS_MODULE,
- .o_connect = mdt_obd_connect,
- .o_disconnect = mdt_obd_disconnect,
- .o_notify = mdt_notify,
+ .o_owner = THIS_MODULE,
+ .o_connect = mdt_obd_connect,
+ .o_disconnect = mdt_obd_disconnect,
+ .o_init_export = mdt_init_export, /* By Huang Hua*/
+ .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/
+ .o_notify = mdt_notify,
};
static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
* Author: Phil Schwan <phil@clusterfs.com>
* Author: Mike Shaver <shaver@clusterfs.com>
* Author: Nikita Danilov <nikita@clusterfs.com>
+ * Author: Huang Hua <huanghua@clusterfs.com>
*
* This file is part of the Lustre file system, http://www.lustre.org
* Lustre is a trademark of Cluster File Systems, Inc.
*/
#include <lustre/lustre_idl.h>
#include <md_object.h>
+#include <dt_object.h>
#include <lustre_fid.h>
#include <lustre_fld.h>
#include <lustre_req_layout.h>
/* LR_CLIENT_SIZE, etc. */
#include <lustre_disk.h>
+
+/* Data stored per client in the last_rcvd file. In le32 order. */
+struct mdt_client_data {
+ __u8 mcd_uuid[40]; /* client UUID */
+ __u64 mcd_last_transno; /* last completed transaction ID */
+ __u64 mcd_last_xid; /* xid for the last transaction */
+ __u32 mcd_last_result; /* result from last RPC */
+ __u32 mcd_last_data; /* per-op data (disposition for open &c.) */
+ __u8 mcd_padding[LR_CLIENT_SIZE - 64];
+};
+
+/* copied from lr_server_data.
+ * mds data stored at the head of last_rcvd file. In le32 order. */
+struct mdt_server_data {
+ __u8 msd_uuid[40]; /* server UUID */
+ __u64 msd_unused; /* was fsd_last_objid - don't use for now */
+ __u64 msd_last_transno; /* last completed transaction ID */
+ __u64 msd_mount_count; /* incarnation number */
+ __u32 msd_feature_compat; /* compatible feature flags */
+ __u32 msd_feature_rocompat;/* read-only compatible feature flags */
+ __u32 msd_feature_incompat;/* incompatible feature flags */
+ __u32 msd_server_size; /* size of server data area */
+ __u32 msd_client_start; /* start of per-client data area */
+ __u16 msd_client_size; /* size of per-client data area */
+ __u16 msd_subdir_count; /* number of subdirectories for objects */
+ __u64 msd_catalog_oid; /* recovery catalog object id */
+ __u32 msd_catalog_ogen; /* recovery catalog inode generation */
+ __u8 msd_peeruuid[40]; /* UUID of MDS associated with this OST */
+ __u32 msd_ost_index; /* index number of OST in LOV */
+ __u32 msd_mdt_index; /* index number of MDT in LMV */
+ __u8 msd_padding[LR_SERVER_SIZE - 148];
+};
+
+struct mdt_object;
+/* file data for open files on MDS */
+struct mdt_file_data {
+ struct portals_handle mfd_handle; /* must be first */
+ atomic_t mfd_refcount;
+ struct list_head mfd_list; /* protected by med_open_lock */
+ __u64 mfd_xid;
+ int mfd_mode;
+ struct mdt_object *mfd_object;
+};
+
struct mdt_device {
/* super-class */
struct md_device mdt_md_dev;
* or should be placed somewhere else. */
int mdt_max_mdsize;
int mdt_max_cookiesize;
-
+ __u64 mdt_mount_count;
+
+ struct mdt_server_data mdt_msd;
+ unsigned long mdt_client_bitmap[LR_MAX_CLIENTS / sizeof(long)];
+ struct dt_object *mdt_last;
};
-/* Data stored per client in the last_rcvd file. In le32 order. */
-struct mdt_client_data {
- __u8 mcd_uuid[40]; /* client UUID */
- __u64 mcd_last_transno; /* last completed transaction ID */
- __u64 mcd_last_xid; /* xid for the last transaction */
- __u32 mcd_last_result; /* result from last RPC */
- __u32 mcd_last_data; /* per-op data (disposition for open &c.) */
- __u8 mcd_padding[LR_CLIENT_SIZE - 64];
-};
-#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
-static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
-{
- LASSERT(m->mdt_child);
- return m->mdt_child->md_ops;
-}
+/*XXX copied from mds_internal.h */
+#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+#define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID)
+#define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
enum mdt_flags {
/*
struct md_object mot_obj;
};
-static inline struct md_object *mdt_object_child(struct mdt_object *o)
-{
- return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
-
struct mdt_lock_handle {
struct lustre_handle mlh_lh;
ldlm_mode_t mlh_mode;
};
-void mdt_lock_handle_init(struct mdt_lock_handle *lh);
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
-
-int md_device_init(struct md_device *md, struct lu_device_type *t);
-void md_device_fini(struct md_device *md);
-
enum {
MDT_REP_BUF_NR_MAX = 8
};
struct mdt_reint_reply mti_reint_rep;
};
-int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
- struct lustre_handle *, ldlm_mode_t,
- ldlm_policy_data_t *);
+static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
+{
+ LASSERT(m->mdt_child);
+ return m->mdt_child->md_ops;
+}
+
+static inline struct md_object *mdt_object_child(struct mdt_object *o)
+{
+ return lu2md(lu_object_next(&o->mot_obj.mo_lu));
+}
+
+static inline struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info)
+{
+ return info->mti_pill.rc_req;
+}
+
+static inline void mdt_object_get(const struct lu_context *ctxt,
+ struct mdt_object *o)
+{
+ lu_object_get(&o->mot_obj.mo_lu);
+}
-void fid_unlock(struct ldlm_namespace *, const struct lu_fid *,
- struct lustre_handle *, ldlm_mode_t);
+static inline void mdt_object_put(const struct lu_context *ctxt,
+ struct mdt_object *o)
+{
+ lu_object_put(ctxt, &o->mot_obj.mo_lu);
+}
-struct mdt_object *mdt_object_find(const struct lu_context *,
- struct mdt_device *, const struct lu_fid *);
-void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *);
+static inline const struct lu_fid *mdt_object_fid(struct mdt_object *o)
+{
+ return lu_object_fid(&o->mot_obj.mo_lu);
+}
-int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *,
- struct mdt_lock_handle *, __u64);
+int mdt_object_lock(struct ldlm_namespace *,
+ struct mdt_object *,
+ struct mdt_lock_handle *,
+ __u64);
-void mdt_object_unlock(struct ldlm_namespace *, struct mdt_object *,
+void mdt_object_unlock(struct ldlm_namespace *,
+ struct mdt_object *,
struct mdt_lock_handle *);
+struct mdt_object *mdt_object_find(const struct lu_context *,
+ struct mdt_device *,
+ const struct lu_fid *);
struct mdt_object *mdt_object_find_lock(const struct lu_context *,
struct mdt_device *,
const struct lu_fid *,
int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
int mdt_reint_rec(struct mdt_thread_info *);
void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr);
-const struct lu_fid *mdt_object_fid(struct mdt_object *o);
-static inline struct ptlrpc_request *mdt_info_req (struct mdt_thread_info *info)
-{
- return info->mti_pill.rc_req;
-}
+
int mdt_getxattr(struct mdt_thread_info *info);
int mdt_setxattr(struct mdt_thread_info *info);
+void mdt_lock_handle_init(struct mdt_lock_handle *lh);
+void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
+
+
+int mdt_object_exists(const struct lu_context *ctx,
+ const struct lu_object *o);
+
+int mdt_fs_setup(const struct lu_context *ctxt,
+ struct mdt_device *mdt);
+
+void mdt_fs_cleanup(const struct lu_context *ctxt,
+ struct mdt_device *mdt);
+
+int mdt_client_free(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ struct mdt_export_data *med);
+
+int mdt_update_server_data(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ int sync);
+
+int mdt_client_add(const struct lu_context *ctxt,
+ struct mdt_device *mdt,
+ struct mdt_export_data *med,
+ int cl_idx);
+
+int mdt_pin(struct mdt_thread_info* info);
+
+int mdt_lock_new_child(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *child_lockh);
+
+int mdt_reint_open(struct mdt_thread_info *info);
+
+int mdt_mfd_close(const struct lu_context *ctxt,
+ struct mdt_file_data *mfd,
+ int unlink_orphan);
+
+int mdt_close(struct mdt_thread_info *info);
+
+int mdt_done_writing(struct mdt_thread_info *info);
+
+
+
#endif /* __KERNEL__ */
#endif /* _MDT_H */
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/mdt/mdt_open.c
+ * Lustre Metadata Target (mdt) open/close file handling
+ *
+ * Copyright (C) 2002-2006 Cluster File Systems, Inc.
+ * Author: Huang Hua <huanghua@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/*
+ * MDS file data handling: file data holds a handle for a file opened
+ * by a client.
+ */
+
+static void mdt_mfd_get(void *mfdp)
+{
+ struct mdt_file_data *mfd = mfdp;
+
+ atomic_inc(&mfd->mfd_refcount);
+ CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount));
+}
+
+/* Create a new mdt_file_data struct.
+ * reference is set to 1 */
+static struct mdt_file_data *mdt_mfd_new(void)
+{
+ struct mdt_file_data *mfd;
+
+ OBD_ALLOC_PTR(mfd);
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ return NULL;
+ }
+
+ atomic_set(&mfd->mfd_refcount, 1);
+
+ INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+ INIT_LIST_HEAD(&mfd->mfd_list);
+ class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
+
+ return mfd;
+}
+
+/* Get a new reference on the mfd pointed to by handle, if handle is still
+ * valid. Caller must drop reference with mdt_mfd_put(). */
+static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
+{
+ ENTRY;
+ LASSERT(handle != NULL);
+ RETURN(class_handle2object(handle->cookie));
+}
+
+/* Drop mfd reference, freeing struct if this is the last one. */
+static void mdt_mfd_put(struct mdt_file_data *mfd)
+{
+ CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount) - 1);
+ LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+ atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+ LASSERT(list_empty(&mfd->mfd_handle.h_link));
+ OBD_FREE_PTR(mfd);
+ }
+}
+
+static int mdt_object_open(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ int flags)
+{
+ struct mdt_export_data *med;
+ struct mdt_file_data *mfd;
+ struct mdt_body *repbody;
+ struct lov_mds_md *lmm;
+ int rc = 0;
+ ENTRY;
+
+ med = &mdt_info_req(info)->rq_export->exp_mdt_data;
+ repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+
+ rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
+ &info->mti_attr);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ mdt_pack_attr2body(repbody, &info->mti_attr);
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid |= OBD_MD_FLID;
+
+ rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
+ lmm, info->mti_mdt->mdt_max_mdsize, "lov");
+ if (rc < 0)
+ GOTO(out, rc = -EINVAL);
+
+ if (S_ISDIR(info->mti_attr.la_mode))
+ repbody->valid |= OBD_MD_FLDIREA;
+ else
+ repbody->valid |= OBD_MD_FLEASIZE;
+ repbody->eadatasize = rc;
+ rc = 0;
+
+ mfd = mdt_mfd_new();
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ if (flags & FMODE_WRITE) {
+ /*mds_get_write_access*/
+ } else if (flags & MDS_FMODE_EXEC) {
+ /*mds_deny_write_access*/
+ }
+
+ /* keep a reference on this object for this open,
+ * and is released by mdt_mfd_close() */
+ mdt_object_get(info->mti_ctxt, o);
+
+ mfd->mfd_mode = flags;
+ mfd->mfd_object = o;
+ mfd->mfd_xid = mdt_info_req(info)->rq_xid;
+
+ spin_lock(&med->med_open_lock);
+ list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+
+ repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+
+ RETURN(rc);
+out:
+ return rc;
+}
+
+int mdt_pin(struct mdt_thread_info* info)
+{
+ struct mdt_object *o;
+ int rc;
+ ENTRY;
+
+ o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1);
+ if (!IS_ERR(o)) {
+ if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
+ rc = mdt_object_open(info, o, info->mti_body->flags);
+ mdt_object_put(info->mti_ctxt, o);
+ } else
+ rc = -ENOENT;
+ } else
+ rc = PTR_ERR(o);
+
+ RETURN(rc);
+}
+
+/* Get an internal lock on the inode number (but not generation) to sync
+ * new inode creation with inode unlink (bug 2029). If child_lockh is NULL
+ * we just get the lock as a barrier to wait for other holders of this lock,
+ * and drop it right away again. */
+int mdt_lock_new_child(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *child_lockh)
+{
+ struct mdt_lock_handle lockh;
+ int rc;
+
+ if (child_lockh == NULL)
+ child_lockh = &lockh;
+
+ lockh.mlh_mode = LCK_EX;
+ rc = mdt_object_lock(info->mti_mdt->mdt_namespace,
+ o, &lockh, MDS_INODELOCK_UPDATE);
+
+ if (rc != ELDLM_OK)
+ CERROR("can not mdt_object_lock: %d\n", rc);
+ else if (child_lockh == &lockh)
+ mdt_object_unlock(info->mti_mdt->mdt_namespace,
+ o, &lockh);
+
+ RETURN(rc);
+}
+
+int mdt_reint_open(struct mdt_thread_info *info)
+{
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *parent;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
+ struct ldlm_reply *ldlm_rep;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_body *body;
+ struct lu_fid child_fid;
+ int result;
+ int created = 0;
+ struct mdt_reint_record *rr = &info->mti_rr;
+ ENTRY;
+
+ /* we now have no resent message, so it must be an intent */
+ LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
+
+ /*TODO: MDS_CHECK_RESENT */;
+
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ lh->mlh_mode = LCK_PW;
+ parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
+ lh, MDS_INODELOCK_UPDATE);
+ if (IS_ERR(parent))
+ GOTO(out, result = PTR_ERR(parent));
+
+ result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
+ rr->rr_name, &child_fid);
+ if (result && result != -ENOENT) {
+ GOTO(out_parent, result);
+ }
+
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
+ if (result == -ENOENT) {
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
+ if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
+ GOTO(out_parent, result);
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_parent, result = -EROFS);
+ child_fid = *info->mti_rr.rr_fid2;
+ } else {
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
+ if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
+ info->mti_attr.la_flags & MDS_OPEN_CREAT)
+ GOTO(out_parent, result = -EEXIST);
+
+ }
+
+ child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
+ if (IS_ERR(child))
+ GOTO(out_parent, result = PTR_ERR(child));
+
+ if (result == -ENOENT) {
+ /* not found and with MDS_OPEN_CREAT: let's create something */
+ result = mdo_create(info->mti_ctxt,
+ mdt_object_child(parent),
+ rr->rr_name,
+ mdt_object_child(child),
+ &info->mti_attr);
+ intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
+ if (result != 0)
+ GOTO(out_child, result);
+ created = 1;
+ } else
+ intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
+ /* Open it now. */
+ result = mdt_object_open(info, child, info->mti_attr.la_flags);
+ GOTO(destroy_child, result);
+
+destroy_child:
+ if (result != 0 && created) {
+ mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
+ mdt_object_child(child), rr->rr_name);
+ } else if (created) {
+ mdt_lock_new_child(info, child, NULL);
+ }
+out_child:
+ mdt_object_put(info->mti_ctxt, child);
+out_parent:
+ mdt_object_unlock(mdt->mdt_namespace, parent, lh);
+ mdt_object_put(info->mti_ctxt, parent);
+out:
+ return result;
+}
+
+int mdt_mfd_close(const struct lu_context *ctxt,
+ struct mdt_file_data *mfd,
+ int unlink_orphan)
+{
+ ENTRY;
+
+ if (mfd->mfd_mode & FMODE_WRITE) {
+ /*mdt_put_write_access*/
+ } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
+ /*mdt_allow_write_access*/
+ }
+
+ /* release reference on this object.
+ * it will be destroyed by lower layer if necessary.
+ */
+ mdt_object_put(ctxt, mfd->mfd_object);
+
+ mdt_mfd_put(mfd);
+ RETURN(0);
+}
+
+int mdt_close(struct mdt_thread_info *info)
+{
+ struct mdt_export_data *med;
+ struct mdt_body *repbody;
+ struct mdt_file_data *mfd;
+ struct mdt_object *o;
+ struct lov_mds_md *lmm;
+ int rc;
+ ENTRY;
+
+ med = &mdt_info_req(info)->rq_export->exp_mdt_data;
+ LASSERT(med);
+
+ spin_lock(&med->med_open_lock);
+ mfd = mdt_handle2mfd(&(info->mti_body->handle));
+ if (mfd == NULL) {
+ spin_unlock(&med->med_open_lock);
+ CDEBUG(D_INODE, "no handle for file close ino "DFID3
+ ": cookie "LPX64, PFID3(&info->mti_body->fid1),
+ info->mti_body->handle.cookie);
+ RETURN(-ESTALE);
+ }
+ class_handle_unhash(&mfd->mfd_handle);
+ list_del_init(&mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
+
+ o = mfd->mfd_object;
+ repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+
+ rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
+ &info->mti_attr);
+ if (rc == 0) {
+ mdt_pack_attr2body(repbody, &info->mti_attr);
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid |= OBD_MD_FLID;
+
+ rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
+ lmm, info->mti_mdt->mdt_max_mdsize, "lov");
+ if (rc >= 0) {
+ if (S_ISDIR(info->mti_attr.la_mode))
+ repbody->valid |= OBD_MD_FLDIREA;
+ else
+ repbody->valid |= OBD_MD_FLEASIZE;
+ repbody->eadatasize = rc;
+ rc = 0;
+ }
+ }
+
+ rc = mdt_mfd_close(info->mti_ctxt, mfd, 1);
+
+ RETURN(rc);
+}
+
+int mdt_done_writing(struct mdt_thread_info *info)
+{
+ ENTRY;
+
+ RETURN(0);
+}
#include "mdt_internal.h"
-/* object operations */
-static int mdt_md_open(struct mdt_thread_info *info, struct mdt_object *child)
-{
- ENTRY;
- RETURN(mo_open(info->mti_ctxt, mdt_object_child(child)));
-}
-
static int mdt_md_create(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
RETURN(-EOPNOTSUPP);
}
-static int mdt_reint_open(struct mdt_thread_info *info)
-{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *parent;
- struct mdt_object *child;
- struct mdt_lock_handle *lh;
- int result;
- struct ldlm_reply *ldlm_rep;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *body = info->mti_reint_rep.mrr_body;
- struct lov_mds_md *lmm = info->mti_reint_rep.mrr_md;
- struct mdt_reint_record *rr = &info->mti_rr;
- int created = 0;
- struct lu_fid child_fid;
- ENTRY;
-
- lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_mode = LCK_PW;
-
- //req_capsule_pack(&info->mti_pill);
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
- body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-
- parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
- lh, MDS_INODELOCK_UPDATE);
- if (IS_ERR(parent))
- RETURN(PTR_ERR(parent));
-
- result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
- rr->rr_name, &child_fid);
- if (result && result != -ENOENT) {
- GOTO(out_parent, result);
- }
-
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
-
- if (result == -ENOENT) {
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
- if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
- GOTO(out_parent, result);
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- GOTO(out_parent, result = -EROFS);
- child_fid = *info->mti_rr.rr_fid2;
- } else {
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
- if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
- info->mti_attr.la_flags & MDS_OPEN_CREAT)
- GOTO(out_parent, result = -EEXIST);
-
- }
- child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
- if (IS_ERR(child))
- GOTO(out_parent, result = PTR_ERR(child));
-
- if (result == -ENOENT) {
- /* not found and with MDS_OPEN_CREAT: let's create something */
- result = mdo_create(info->mti_ctxt,
- mdt_object_child(parent),
- rr->rr_name,
- mdt_object_child(child),
- &info->mti_attr);
- intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
- if (result != 0)
- GOTO(out_child, result);
- created = 1;
- } else {
- result = mo_attr_get(info->mti_ctxt,
- mdt_object_child(child),
- &info->mti_attr);
- if (result != 0)
- GOTO(destroy_child, result);
- }
-
- mdt_pack_attr2body(body, &info->mti_attr);
- body->fid1 = *mdt_object_fid(child);
- body->valid |= OBD_MD_FLID;
-
- /* we should return "struct lov_mds_md" back*/
- lmm = req_capsule_server_get(&info->mti_pill,
- &RMF_MDT_MD);
-
- /* not supported yet in MDD
- result = mo_xattr_get(info->mti_ctxt, mdt_object_child(child),
- lmm, MAX_MD_SIZE, "lov");
- if (result < 0)
- GOTO(destroy_child, result = -EINVAL);
- */
- if (S_ISDIR(info->mti_attr.la_mode))
- body->valid |= OBD_MD_FLDIREA;
- else
- body->valid |= OBD_MD_FLEASIZE;
- body->eadatasize = result;
- result = 0;
-
- /* FIXME Let me fake it until the underlying works */
- lmm->lmm_magic = LOV_MAGIC; /* magic number = LOV_MAGIC_V1 */
- lmm->lmm_pattern = LOV_PATTERN_RAID0; /* LOV_PATTERN_RAID0, LOV_PATTERN_RAID1 */
- lmm->lmm_object_id = 1; /* LOV object ID */
- lmm->lmm_object_gr = 1; /* LOV object group */
- lmm->lmm_stripe_size = 4096 * 1024; /* size of stripe in bytes */
- lmm->lmm_stripe_count = 1; /* num stripes in use for this object */
- /* per-stripe data */
- lmm->lmm_objects[0].l_object_id = 1; /* OST object ID */
- lmm->lmm_objects[0].l_object_gr = 1; /* OST object group (creating MDS number) */
- lmm->lmm_objects[0].l_ost_gen = 1; /* generation of this l_ost_idx */
- lmm->lmm_objects[0].l_ost_idx = 0; /* OST index in LOV (lov_tgt_desc->tgts) */
- body->eadatasize = sizeof(struct lov_mds_md) + sizeof(struct lov_ost_data);
-
- /* Open it now. */
- /* TODO: not supported yet
- result = mdt_md_open(info, child);
- */
-
-destroy_child:
- if (result != 0 && created)
- mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
- mdt_object_child(child), rr->rr_name);
-out_child:
- mdt_object_put(info->mti_ctxt, child);
-out_parent:
- mdt_object_unlock(mdt->mdt_namespace, parent, lh);
- mdt_object_put(info->mti_ctxt, parent);
- RETURN(result);
-}
-
typedef int (*mdt_reinter)(struct mdt_thread_info *info);