+/* Exported function from this file are:
+ *
+ * mds_open - called by the intent handler
+ * mds_close - an rpc handling function
+ * mds_pin - an rpc handling function - which will go away
+ * mds_mfd_close - for force closing files when a client dies
+ */
+
+/*
+ * MDS file data handling: file data holds a handle for a file opened
+ * by a client.
+ */
+
+static void mds_mfd_addref(void *mfdp)
+{
+ struct mds_file_data *mfd = mfdp;
+
+ atomic_inc(&mfd->mfd_refcount);
+ CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount));
+}
+
+struct mds_file_data *mds_mfd_new(void)
+{
+ struct mds_file_data *mfd;
+
+ OBD_ALLOC(mfd, sizeof *mfd);
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ return NULL;
+ }
+
+ atomic_set(&mfd->mfd_refcount, 2);
+
+ INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+ class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
+
+ return mfd;
+}
+
+static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+{
+ ENTRY;
+ LASSERT(handle != NULL);
+ RETURN(class_handle2object(handle->cookie));
+}
+
+static void mds_mfd_put(struct mds_file_data *mfd)
+{
+ CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount) - 1);
+ LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+ atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+ LASSERT(list_empty(&mfd->mfd_handle.h_link));
+ OBD_FREE(mfd, sizeof *mfd);
+ }
+}
+
+static void mds_mfd_destroy(struct mds_file_data *mfd)
+{
+ class_handle_unhash(&mfd->mfd_handle);
+ mds_mfd_put(mfd);
+}
+
+
+/* Caller must hold mds->mds_epoch_sem */
+static int mds_alloc_filterdata(struct inode *inode)
+{
+ LASSERT(inode->i_filterdata == NULL);
+ OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data));
+ if (inode->i_filterdata == NULL)
+ return -ENOMEM;
+ LASSERT(igrab(inode) == inode);
+ return 0;
+}
+
+/* Caller must hold mds->mds_epoch_sem */
+static void mds_free_filterdata(struct inode *inode)
+{
+ LASSERT(inode->i_filterdata != NULL);
+ OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data));
+ inode->i_filterdata = NULL;
+ iput(inode);
+}
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count. The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * We could use a different semaphore for each file, if it ever shows
+ * up in a profile, which it won't.
+ *
+ * epoch argument is nonzero during recovery */
+static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
+ __u64 epoch)
+{
+ int rc = 0;
+
+ down(&mds->mds_epoch_sem);
+
+ if (atomic_read(&inode->i_writecount) < 0) {
+ up(&mds->mds_epoch_sem);
+ RETURN(-ETXTBSY);
+ }
+
+
+ if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
+ CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
+ MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
+ inode->i_generation);
+ goto out;
+ }
+
+ if (inode->i_filterdata == NULL)
+ mds_alloc_filterdata(inode);
+ if (inode->i_filterdata == NULL) {
+ rc = -ENOMEM;
+ goto out;
+ }
+ if (epoch > mds->mds_io_epoch)
+ mds->mds_io_epoch = epoch;
+ else
+ mds->mds_io_epoch++;
+ MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
+ CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
+ mds->mds_io_epoch, inode->i_ino, inode->i_generation);
+ out:
+ if (rc == 0)
+ atomic_inc(&inode->i_writecount);
+ up(&mds->mds_epoch_sem);
+ return rc;
+}
+
+/* Returns EAGAIN if the client needs to get size and/or cookies and close
+ * again -- which is never true if the file is about to be unlinked. Otherwise
+ * returns the number of remaining writers. */
+static int mds_put_write_access(struct mds_obd *mds, struct inode *inode,
+ struct mds_body *body, int unlinking)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&mds->mds_epoch_sem);
+ atomic_dec(&inode->i_writecount);
+ rc = atomic_read(&inode->i_writecount);
+ if (rc > 0)
+ GOTO(out, rc);
+#if 0
+ if (!unlinking && !(body->valid & OBD_MD_FLSIZE))
+ GOTO(out, rc = EAGAIN);
+#endif
+ mds_free_filterdata(inode);
+ out:
+ up(&mds->mds_epoch_sem);
+ return rc;
+}
+
+static int mds_deny_write_access(struct mds_obd *mds, struct inode *inode)
+{
+ ENTRY;
+ down(&mds->mds_epoch_sem);
+ if (atomic_read(&inode->i_writecount) > 0) {
+ up(&mds->mds_epoch_sem);
+ RETURN(-ETXTBSY);
+ }
+ atomic_dec(&inode->i_writecount);
+ up(&mds->mds_epoch_sem);
+ RETURN(0);
+}
+
+static void mds_allow_write_access(struct inode *inode)
+{
+ ENTRY;
+ atomic_inc(&inode->i_writecount);
+}
+
+int mds_query_write_access(struct inode *inode)
+{
+ ENTRY;
+ RETURN(atomic_read(&inode->i_writecount));
+}
+
+/* This replaces the VFS mds_dentry_open, it manages mfd and writecount */
+static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
+ struct vfsmount *mnt, int flags,
+ struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_obd *mds = mds_req2mds(req);
+ struct mds_file_data *mfd;
+ struct mds_body *body;
+ int error;
+ ENTRY;
+
+ mfd = mds_mfd_new();
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ GOTO(cleanup_dentry, error = -ENOMEM);
+ }
+
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+
+ if (flags & FMODE_WRITE) {
+ /* FIXME: in recovery, need to pass old epoch here */
+ error = mds_get_write_access(mds, dentry->d_inode, 0);
+ if (error)
+ GOTO(cleanup_mfd, error);
+ body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
+ } else if (flags & FMODE_EXEC) {
+ error = mds_deny_write_access(mds, dentry->d_inode);
+ if (error)
+ GOTO(cleanup_mfd, error);
+ }
+
+ dget(dentry);
+
+ /* Mark the file as open to handle open-unlink. */
+ mds_open_orphan_inc(dentry->d_inode);
+
+ mfd->mfd_mode = flags;
+ mfd->mfd_dentry = dentry;
+ mfd->mfd_xid = req->rq_xid;
+
+ spin_lock(&med->med_open_lock);
+ list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+ mds_mfd_put(mfd);
+
+ body->handle.cookie = mfd->mfd_handle.h_cookie;
+
+ RETURN(mfd);
+
+cleanup_mfd:
+ mds_mfd_put(mfd);
+ mds_mfd_destroy(mfd);
+cleanup_dentry:
+ return ERR_PTR(error);
+}
+
+static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
+ struct lov_desc *desc)
+{
+ int i;
+ for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
+ ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
+ le64_to_cpu(lmm->lmm_objects[i].l_object_id);
+ }
+}
+
+/* Must be called with i_sem held */
+static int mds_create_objects(struct ptlrpc_request *req, int offset,
+ struct mds_update_record *rec,
+ struct mds_obd *mds, struct obd_device *obd,
+ struct dentry *dchild, void **handle,
+ obd_id **ids)
+{
+ struct obdo *oa;
+ struct obd_trans_info oti = { 0 };
+ struct mds_body *body;
+ struct lov_stripe_md *lsm = NULL;
+ struct lov_mds_md *lmm = NULL;
+ struct inode *inode = dchild->d_inode;
+ void *lmm_buf;
+ int rc, lmm_bufsize, lmm_size;
+ ENTRY;
+
+ if (rec->ur_flags & MDS_OPEN_DELAY_CREATE ||
+ !(rec->ur_flags & FMODE_WRITE))
+ RETURN(0);
+
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
+
+ if (!S_ISREG(inode->i_mode))
+ RETURN(0);
+ if (body->valid & OBD_MD_FLEASIZE)
+ RETURN(0);
+
+ OBD_ALLOC(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
+ if (*ids == NULL)
+ RETURN(-ENOMEM);
+ oti.oti_objid = *ids;
+
+ if (*handle == NULL)
+ *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
+ if (IS_ERR(*handle)) {
+ rc = PTR_ERR(*handle);
+ *handle = NULL;
+ GOTO(out_ids, rc);
+ }
+
+ /* replay case */
+ if(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ LASSERT (rec->ur_fid2->id);
+ body->valid |= OBD_MD_FLBLKSZ | OBD_MD_FLEASIZE;
+ lmm_size = rec->ur_eadatalen;
+ lmm = rec->ur_eadata;
+ LASSERT(lmm);
+
+ mds_objids_from_lmm(*ids, lmm, &mds->mds_lov_desc);
+
+ lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
+ lmm_bufsize = req->rq_repmsg->buflens[offset];
+ LASSERT(lmm_buf);
+ LASSERT(lmm_bufsize >= lmm_size);
+ memcpy(lmm_buf, lmm, lmm_size);
+ rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size);
+ if (rc)
+ CERROR("open replay failed to set md:%d\n", rc);
+ RETURN(0);
+ }
+
+ if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO))
+ GOTO(out_ids, rc = -ENOMEM);
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ GOTO(out_ids, rc = -ENOMEM);
+ oa->o_mode = S_IFREG | 0600;
+ oa->o_id = inode->i_ino;
+ oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+ oa->o_generation = inode->i_generation;
+ oa->o_uid = 0; /* must have 0 uid / gid on OST */
+ oa->o_gid = 0;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE |
+ OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
+ oa->o_size = 0;
+
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME|
+ OBD_MD_FLCTIME);
+
+ if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) {
+ /* check if things like lfs setstripe are sending us the ea */
+ if (rec->ur_flags & MDS_OPEN_HAS_EA) {
+ rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
+ mds->mds_osc_exp,
+ 0, &lsm, rec->ur_eadata);
+ if (rc)
+ GOTO(out_oa, rc);
+ }
+ LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
+ rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti);
+ if (rc) {
+ int level = D_ERROR;
+ if (rc == -ENOSPC)
+ level = D_INODE;
+ CDEBUG(level, "error creating objects for "
+ "inode %lu: rc = %d\n",
+ inode->i_ino, rc);
+ if (rc > 0) {
+ CERROR("obd_create returned invalid "
+ "rc %d\n", rc);
+ rc = -EIO;
+ }
+ GOTO(out_oa, rc);
+ }
+ } else {
+ rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_osc_exp,
+ 0, &lsm, rec->ur_eadata);
+ if (rc) {
+ GOTO(out_oa, rc);
+ }
+ lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_object_gr = oa->o_gr;
+ }
+ if (inode->i_size) {
+ oa->o_size = inode->i_size;
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|
+ OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE);
+ rc = obd_setattr(mds->mds_osc_exp, oa, lsm, &oti);
+ if (rc) {
+ CERROR("error setting attrs for inode %lu: rc %d\n",
+ inode->i_ino, rc);
+ if (rc > 0) {
+ CERROR("obd_setattr returned bad rc %d\n", rc);
+ rc = -EIO;
+ }
+ GOTO(out_oa, rc);
+ }
+ }
+
+ body->valid |= OBD_MD_FLBLKSZ | OBD_MD_FLEASIZE;
+ obdo_refresh_inode(inode, oa, OBD_MD_FLBLKSZ);
+
+ LASSERT(lsm && lsm->lsm_object_id);
+ lmm = NULL;
+ rc = obd_packmd(mds->mds_osc_exp, &lmm, lsm);
+ if (!rec->ur_fid2->id)
+ obd_free_memmd(mds->mds_osc_exp, &lsm);
+ LASSERT(rc >= 0);
+ lmm_size = rc;
+ body->eadatasize = rc;
+ rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size);
+ lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
+ lmm_bufsize = req->rq_repmsg->buflens[offset];
+ LASSERT(lmm_buf);
+ LASSERT(lmm_bufsize >= lmm_size);
+
+ memcpy(lmm_buf, lmm, lmm_size);
+ obd_free_diskmd(mds->mds_osc_exp, &lmm);
+ out_oa:
+ oti_free_cookies(&oti);
+ obdo_free(oa);
+ out_ids:
+ if (rc) {
+ OBD_FREE(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
+ *ids = NULL;
+ }
+ RETURN(rc);
+}
+
+static void reconstruct_open(struct mds_update_record *rec, int offset,
+ struct ptlrpc_request *req,
+ struct lustre_handle *child_lockh)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct mds_obd *mds = mds_req2mds(req);
+ struct mds_file_data *mfd;
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct dentry *parent, *child;
+ struct ldlm_reply *rep;
+ struct mds_body *body;
+ int rc;
+ struct list_head *t;
+ int put_child = 1;
+ ENTRY;
+
+ LASSERT(offset == 2); /* only called via intent */
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+
+ /* copy rc, transno and disp; steal locks */
+ mds_req_from_mcd(req, mcd);
+ intent_set_disposition(rep, mcd->mcd_last_data);
+
+ /* Only replay if create or open actually happened. */
+ if (!intent_disposition(rep, DISP_OPEN_CREATE | DISP_OPEN_OPEN) ) {
+ EXIT;
+ return; /* error looking up parent or child */
+ }
+
+ parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+ LASSERT(!IS_ERR(parent));
+
+ child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
+ LASSERT(!IS_ERR(child));
+
+ if (!child->d_inode) {
+ GOTO(out_dput, 0); /* child not present to open */
+ }
+
+ /* At this point, we know we have a child. We'll send
+ * it back _unless_ it not created and open failed.
+ */
+ if (intent_disposition(rep, DISP_OPEN_OPEN) &&
+ !intent_disposition(rep, DISP_OPEN_CREATE) &&
+ req->rq_status) {
+ GOTO(out_dput, 0);
+ }
+
+ /* get lock (write for O_CREAT, read otherwise) */
+
+ mds_pack_inode2fid(obd, &body->fid1, child->d_inode);
+ mds_pack_inode2body(obd, body, child->d_inode);
+ if (S_ISREG(child->d_inode->i_mode)) {
+ rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+ child->d_inode, 1);
+
+ if (rc)
+ LASSERT(rc == req->rq_status);
+
+ /* If we have LOV EA data, the OST holds size, mtime */
+ if (!(body->valid & OBD_MD_FLEASIZE))
+ body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME);
+ } else {
+ /* XXX need to check this case */
+ }
+
+ /* If we're opening a file without an EA, change to a write
+ lock (unless we already have one). */
+
+ /* If we have -EEXIST as the status, and we were asked to create
+ * exclusively, we can tell we failed because the file already existed.
+ */
+ if (req->rq_status == -EEXIST &&
+ ((rec->ur_flags & (MDS_OPEN_CREAT | MDS_OPEN_EXCL)) ==
+ (MDS_OPEN_CREAT | MDS_OPEN_EXCL))) {
+ GOTO(out_dput, 0);
+ }
+
+ /* If we didn't get as far as trying to open, then some locking thing
+ * probably went wrong, and we'll just bail here.
+ */
+ if (!intent_disposition(rep, DISP_OPEN_OPEN))
+ GOTO(out_dput, 0);
+
+ /* If we failed, then we must have failed opening, so don't look for
+ * file descriptor or anything, just give the client the bad news.
+ */
+ if (req->rq_status)
+ GOTO(out_dput, 0);
+
+ mfd = NULL;
+ list_for_each(t, &med->med_open_head) {
+ mfd = list_entry(t, struct mds_file_data, mfd_list);
+ if (mfd->mfd_xid == req->rq_xid)
+ break;
+ mfd = NULL;
+ }
+
+ /* #warning "XXX fixme" bug 2991 */
+ /* Here it used to LASSERT(mfd) if exp_outstanding_reply != NULL.
+ * Now that exp_outstanding_reply is a list, it's just using mfd != NULL
+ * to detect a re-open */
+ if (mfd == NULL) {
+ mntget(mds->mds_vfsmnt);
+ CERROR("Re-opened file \n");
+ mfd = mds_dentry_open(child, mds->mds_vfsmnt,
+ rec->ur_flags & ~MDS_OPEN_TRUNC, req);
+ if (!mfd) {
+ CERROR("mds: out of memory\n");
+ GOTO(out_dput, req->rq_status = -ENOMEM);
+ }
+ put_child = 0;
+ } else {
+ body->handle.cookie = mfd->mfd_handle.h_cookie;
+ CDEBUG(D_INODE, "resend mfd %p, cookie "LPX64"\n", mfd,
+ mfd->mfd_handle.h_cookie);
+ }
+
+ out_dput:
+ if (put_child)
+ l_dput(child);
+ l_dput(parent);
+ EXIT;
+}
+
+/* do NOT or the MAY_*'s, you'll get the weakest */
+static int accmode(int flags)
+{
+ int res = 0;
+
+ if (flags & FMODE_READ)
+ res = MAY_READ;
+ if (flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+ res |= MAY_WRITE;
+ if (flags & FMODE_EXEC)
+ res = MAY_EXEC;
+ return res;
+}
+
+/* Handles object creation, actual opening, and I/O epoch */
+static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
+ struct mds_body *body, int flags, void **handle,
+ struct mds_update_record *rec,struct ldlm_reply *rep)