+#include "mds_internal.h"
+
+/* Exported function from this file are:
+ *
+ * mds_open - called by the intent handler
+ * mds_close - an rpc handling function
+ * mds_pin - an rpc handling function - which will go away
+ * mds_mfd_close - for force closing files when a client dies
+ */
+
+/*
+ * MDS file data handling: file data holds a handle for a file opened
+ * by a client.
+ */
+
+static void mds_mfd_addref(void *mfdp)
+{
+ struct mds_file_data *mfd = mfdp;
+
+ atomic_inc(&mfd->mfd_refcount);
+ CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount));
+}
+
+struct mds_file_data *mds_mfd_new(void)
+{
+ struct mds_file_data *mfd;
+
+ OBD_ALLOC(mfd, sizeof *mfd);
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ return NULL;
+ }
+
+ atomic_set(&mfd->mfd_refcount, 2);
+
+ INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+ class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
+
+ return mfd;
+}
+
+static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+{
+ ENTRY;
+ LASSERT(handle != NULL);
+ RETURN(class_handle2object(handle->cookie));
+}
+
+static void mds_mfd_put(struct mds_file_data *mfd)
+{
+ CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount) - 1);
+ LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+ atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+ LASSERT(list_empty(&mfd->mfd_handle.h_link));
+ OBD_FREE(mfd, sizeof *mfd);
+ }
+}
+
+static void mds_mfd_destroy(struct mds_file_data *mfd)
+{
+ class_handle_unhash(&mfd->mfd_handle);
+ mds_mfd_put(mfd);
+}
+
+
+/* Caller must hold mds->mds_epoch_sem */
+static int mds_alloc_filterdata(struct inode *inode)
+{
+ LASSERT(inode->i_filterdata == NULL);
+ OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data));
+ if (inode->i_filterdata == NULL)
+ return -ENOMEM;
+ LASSERT(igrab(inode) == inode);
+ return 0;
+}
+
+/* Caller must hold mds->mds_epoch_sem */
+static void mds_free_filterdata(struct inode *inode)
+{
+ LASSERT(inode->i_filterdata != NULL);
+ OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data));
+ inode->i_filterdata = NULL;
+ iput(inode);
+}
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count. The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * We could use a different semaphore for each file, if it ever shows
+ * up in a profile, which it won't.
+ *
+ * epoch argument is nonzero during recovery */
+static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
+ __u64 epoch)
+{
+ int rc = 0;
+
+ down(&mds->mds_epoch_sem);
+
+ if (atomic_read(&inode->i_writecount) < 0) {
+ up(&mds->mds_epoch_sem);
+ RETURN(-ETXTBSY);
+ }
+
+
+ if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
+ CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
+ MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
+ inode->i_generation);
+ goto out;
+ }
+
+ if (inode->i_filterdata == NULL)
+ mds_alloc_filterdata(inode);
+ if (inode->i_filterdata == NULL) {
+ rc = -ENOMEM;
+ goto out;
+ }
+ if (epoch > mds->mds_io_epoch)
+ mds->mds_io_epoch = epoch;
+ else
+ mds->mds_io_epoch++;
+ MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
+ CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
+ mds->mds_io_epoch, inode->i_ino, inode->i_generation);
+ out:
+ if (rc == 0)
+ atomic_inc(&inode->i_writecount);
+ up(&mds->mds_epoch_sem);
+ return rc;
+}
+
+/* Returns EAGAIN if the client needs to get size and/or cookies and close
+ * again -- which is never true if the file is about to be unlinked. Otherwise
+ * returns the number of remaining writers. */
+static int mds_put_write_access(struct mds_obd *mds, struct inode *inode,
+ struct mds_body *body, int unlinking)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&mds->mds_epoch_sem);
+ atomic_dec(&inode->i_writecount);
+ rc = atomic_read(&inode->i_writecount);
+ if (rc > 0)
+ GOTO(out, rc);
+#if 0
+ if (!unlinking && !(body->valid & OBD_MD_FLSIZE))
+ GOTO(out, rc = EAGAIN);
+#endif
+ mds_free_filterdata(inode);
+ out:
+ up(&mds->mds_epoch_sem);
+ return rc;
+}
+
+static int mds_deny_write_access(struct mds_obd *mds, struct inode *inode)
+{
+ ENTRY;
+ down(&mds->mds_epoch_sem);
+ if (atomic_read(&inode->i_writecount) > 0) {
+ up(&mds->mds_epoch_sem);
+ RETURN(-ETXTBSY);
+ }
+ atomic_dec(&inode->i_writecount);
+ up(&mds->mds_epoch_sem);
+ RETURN(0);
+}
+
+static void mds_allow_write_access(struct inode *inode)
+{
+ ENTRY;
+ atomic_inc(&inode->i_writecount);
+}
+
+int mds_query_write_access(struct inode *inode)
+{
+ ENTRY;
+ RETURN(atomic_read(&inode->i_writecount));
+}
+
+/* This replaces the VFS mds_dentry_open, it manages mfd and writecount */
+static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
+ struct vfsmount *mnt, int flags,
+ struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_obd *mds = mds_req2mds(req);
+ struct mds_file_data *mfd;
+ struct mds_body *body;
+ int error;
+ ENTRY;
+
+ mfd = mds_mfd_new();
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ GOTO(cleanup_dentry, error = -ENOMEM);
+ }
+
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+
+ if (flags & FMODE_WRITE) {
+ /* FIXME: in recovery, need to pass old epoch here */
+ error = mds_get_write_access(mds, dentry->d_inode, 0);
+ if (error)
+ GOTO(cleanup_mfd, error);
+ body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
+ } else if (flags & FMODE_EXEC) {
+ error = mds_deny_write_access(mds, dentry->d_inode);
+ if (error)
+ GOTO(cleanup_mfd, error);
+ }
+
+ dget(dentry);
+
+ /* Mark the file as open to handle open-unlink. */
+ mds_open_orphan_inc(dentry->d_inode);
+
+ mfd->mfd_mode = flags;
+ mfd->mfd_dentry = dentry;
+ mfd->mfd_xid = req->rq_xid;
+
+ spin_lock(&med->med_open_lock);
+ list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+ mds_mfd_put(mfd);
+
+ body->handle.cookie = mfd->mfd_handle.h_cookie;
+
+ RETURN(mfd);
+
+cleanup_mfd:
+ mds_mfd_put(mfd);
+ mds_mfd_destroy(mfd);
+cleanup_dentry:
+ return ERR_PTR(error);
+}
+
+static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
+ struct lov_desc *desc)
+{
+ int i;
+ for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
+ ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
+ le64_to_cpu(lmm->lmm_objects[i].l_object_id);
+ }
+}
+
+/* Must be called with i_sem held */
+static int mds_create_objects(struct ptlrpc_request *req, int offset,
+ struct mds_update_record *rec,
+ struct mds_obd *mds, struct obd_device *obd,
+ struct inode *inode, void **handle, obd_id **ids)
+{
+ struct obdo *oa;
+ struct obd_trans_info oti = { 0 };
+ struct mds_body *body;
+ struct lov_stripe_md *lsm = NULL;
+ struct lov_mds_md *lmm = NULL;
+ void *lmm_buf;
+ int rc, lmm_bufsize, lmm_size;
+ ENTRY;
+
+ if (rec->ur_flags & MDS_OPEN_DELAY_CREATE ||
+ !(rec->ur_flags & FMODE_WRITE))
+ RETURN(0);
+
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
+
+ if (!S_ISREG(inode->i_mode))
+ RETURN(0);
+ if (body->valid & OBD_MD_FLEASIZE)
+ RETURN(0);
+
+ OBD_ALLOC(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
+ if (*ids == NULL)
+ RETURN(-ENOMEM);
+ oti.oti_objid = *ids;
+
+ if (*handle == NULL)
+ *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
+ if (IS_ERR(*handle)) {
+ rc = PTR_ERR(*handle);
+ *handle = NULL;
+ GOTO(out_ids, rc);
+ }
+
+ /* replay case */
+ if (rec->ur_fid2->id) {
+ body->valid |= OBD_MD_FLBLKSZ | OBD_MD_FLEASIZE;
+ lmm_size = rec->ur_eadatalen;
+ lmm = rec->ur_eadata;
+ LASSERT(lmm);
+
+ mds_objids_from_lmm(*ids, lmm, &mds->mds_lov_desc);
+
+ lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
+ lmm_bufsize = req->rq_repmsg->buflens[offset];
+ LASSERT(lmm_buf);
+ LASSERT(lmm_bufsize >= lmm_size);
+ memcpy(lmm_buf, lmm, lmm_size);
+ rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size);
+ if (rc)
+ CERROR("open replay failed to set md:%d\n", rc);
+ RETURN(0);
+ }
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ GOTO(out_ids, rc = -ENOMEM);
+ oa->o_mode = S_IFREG | 0600;
+ oa->o_id = inode->i_ino;
+ oa->o_generation = inode->i_generation;
+ oa->o_uid = 0; /* must have 0 uid / gid on OST */
+ oa->o_gid = 0;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE |
+ OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+ oa->o_size = 0;
+
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME|
+ OBD_MD_FLCTIME);
+
+ /* check if things like lstripe/lfs stripe are sending us the ea */
+ if (rec->ur_flags & MDS_OPEN_HAS_EA) {
+ rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_osc_exp,
+ 0, &lsm, rec->ur_eadata);
+ if (rc)
+ GOTO(out_oa, rc);
+ }
+
+ rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti);
+ if (rc) {
+ int level = D_ERROR;
+ if (rc == -ENOSPC)
+ level = D_INODE;
+ CDEBUG(level, "error creating objects for inode %lu: rc = %d\n",
+ inode->i_ino, rc);
+ if (rc > 0) {
+ CERROR("obd_create returned invalid rc %d\n", rc);
+ rc = -EIO;
+ }
+ GOTO(out_oa, rc);
+ }
+
+ if (inode->i_size) {
+ oa->o_size = inode->i_size;
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|
+ OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE);
+ rc = obd_setattr(mds->mds_osc_exp, oa, lsm, &oti);
+ if (rc) {
+ CERROR("error setting attrs for inode %lu: rc %d\n",
+ inode->i_ino, rc);
+ if (rc > 0) {
+ CERROR("obd_setattr returned bad rc %d\n", rc);
+ rc = -EIO;
+ }
+ GOTO(out_oa, rc);
+ }
+ }
+
+ body->valid |= OBD_MD_FLBLKSZ | OBD_MD_FLEASIZE;
+ obdo_refresh_inode(inode, oa, OBD_MD_FLBLKSZ);
+
+ LASSERT(lsm && lsm->lsm_object_id);
+ lmm = NULL;
+ rc = obd_packmd(mds->mds_osc_exp, &lmm, lsm);
+ if (!rec->ur_fid2->id)
+ obd_free_memmd(mds->mds_osc_exp, &lsm);
+ LASSERT(rc >= 0);
+ lmm_size = rc;
+ body->eadatasize = rc;
+ rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size);
+ lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
+ lmm_bufsize = req->rq_repmsg->buflens[offset];
+ LASSERT(lmm_buf);
+ LASSERT(lmm_bufsize >= lmm_size);
+
+ memcpy(lmm_buf, lmm, lmm_size);
+ obd_free_diskmd(mds->mds_osc_exp, &lmm);
+ out_oa:
+ oti_free_cookies(&oti);
+ obdo_free(oa);
+ out_ids:
+ if (rc) {
+ OBD_FREE(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
+ *ids = NULL;
+ }
+ RETURN(rc);
+}
+
+static void reconstruct_open(struct mds_update_record *rec, int offset,
+ struct ptlrpc_request *req,
+ struct lustre_handle *child_lockh)