Whamcloud - gitweb
LU-3544 nfs: writing to new files will return ENOENT
[fs/lustre-release.git] / lustre / llite / file.c
index e0611c9..87b30b4 100644 (file)
@@ -251,6 +251,24 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
 
+       if (fd->fd_lease_och != NULL) {
+               bool lease_broken;
+
+               /* Usually the lease is not released when the
+                * application crashed, we need to release here. */
+               rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
+               CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
+                       PFID(&lli->lli_fid), rc, lease_broken);
+
+               fd->fd_lease_och = NULL;
+       }
+
+       if (fd->fd_och != NULL) {
+               rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och);
+               fd->fd_och = NULL;
+               GOTO(out, rc);
+       }
+
         /* Let's see if we have good enough OPEN lock on the file and if
            we can skip talking to MDS */
         if (file->f_dentry->d_inode) { /* Can this ever be false? */
@@ -287,11 +305,12 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
                        file, file->f_dentry, file->f_dentry->d_name.name);
         }
 
-        LUSTRE_FPRIVATE(file) = NULL;
-        ll_file_data_put(fd);
-        ll_capa_close(inode);
+out:
+       LUSTRE_FPRIVATE(file) = NULL;
+       ll_file_data_put(fd);
+       ll_capa_close(inode);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /* While this returns an error code, fput() the caller does not, so we need
@@ -360,8 +379,6 @@ static int ll_intent_file_open(struct file *file, void *lmm,
 {
         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
         struct dentry *parent = file->f_dentry->d_parent;
-        const char *name = file->f_dentry->d_name.name;
-        const int len = file->f_dentry->d_name.len;
         struct md_op_data *op_data;
         struct ptlrpc_request *req;
         __u32 opc = LUSTRE_OPC_ANY;
@@ -386,9 +403,10 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                         opc = LUSTRE_OPC_CREATE;
         }
 
-        op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
-                                      file->f_dentry->d_inode, name, len,
-                                      O_RDWR, opc, NULL);
+       op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
+                                     file->f_dentry->d_inode, NULL, 0,
+                                     O_RDWR, opc, NULL);
+
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
@@ -443,24 +461,20 @@ void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
         }
 }
 
-static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
-                       struct lookup_intent *it, struct obd_client_handle *och)
+static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
+                      struct obd_client_handle *och)
 {
-        struct ptlrpc_request *req = it->d.lustre.it_data;
-        struct mdt_body *body;
-
-        LASSERT(och);
-
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        LASSERT(body != NULL);                      /* reply already checked out */
+       struct ptlrpc_request *req = it->d.lustre.it_data;
+       struct mdt_body *body;
 
-        memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
-        och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
-        och->och_fid = lli->lli_fid;
-        och->och_flags = it->it_flags;
-        ll_ioepoch_open(lli, body->ioepoch);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       och->och_fh = body->handle;
+       och->och_fid = body->fid1;
+       och->och_lease_handle.cookie = it->d.lustre.it_lock_handle;
+       och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
+       och->och_flags = it->it_flags;
 
-        return md_set_open_replay_data(md_exp, och, req);
+       return md_set_open_replay_data(md_exp, och, req);
 }
 
 int ll_local_open(struct file *file, struct lookup_intent *it,
@@ -479,21 +493,19 @@ int ll_local_open(struct file *file, struct lookup_intent *it,
                 struct mdt_body *body;
                 int rc;
 
-                rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
-                if (rc)
-                        RETURN(rc);
+               rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
+               if (rc != 0)
+                       RETURN(rc);
 
-                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-                if ((it->it_flags & FMODE_WRITE) &&
-                    (body->valid & OBD_MD_FLSIZE))
-                        CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
-                               lli->lli_ioepoch, PFID(&lli->lli_fid));
-        }
+               body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+               ll_ioepoch_open(lli, body->ioepoch);
+       }
 
-        LUSTRE_FPRIVATE(file) = fd;
-        ll_readahead_init(inode, &fd->fd_ras);
-        fd->fd_omode = it->it_flags;
-        RETURN(0);
+       LUSTRE_FPRIVATE(file) = fd;
+       ll_readahead_init(inode, &fd->fd_ras);
+       fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
+
+       RETURN(0);
 }
 
 /* Open a file, and (for the very first open) create objects on the OSTs at
@@ -695,6 +707,199 @@ out_openerr:
         return rc;
 }
 
+static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
+                       struct ldlm_lock_desc *desc, void *data, int flag)
+{
+       int rc;
+       struct lustre_handle lockh;
+       ENTRY;
+
+       switch (flag) {
+       case LDLM_CB_BLOCKING:
+               ldlm_lock2handle(lock, &lockh);
+               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
+               if (rc < 0) {
+                       CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
+                       RETURN(rc);
+               }
+               break;
+       case LDLM_CB_CANCELING:
+               /* do nothing */
+               break;
+       }
+       RETURN(0);
+}
+
+/**
+ * Acquire a lease and open the file.
+ */
+struct obd_client_handle *ll_lease_open(struct inode *inode, struct file *file,
+                                       fmode_t fmode)
+{
+       struct lookup_intent it = { .it_op = IT_OPEN };
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct md_op_data *op_data;
+       struct ptlrpc_request *req;
+       struct lustre_handle old_handle = { 0 };
+       struct obd_client_handle *och = NULL;
+       int rc;
+       int rc2;
+       ENTRY;
+
+       if (fmode != FMODE_WRITE && fmode != FMODE_READ)
+               RETURN(ERR_PTR(-EINVAL));
+
+       if (file != NULL) {
+               struct ll_inode_info *lli = ll_i2info(inode);
+               struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+               struct obd_client_handle **och_p;
+               __u64 *och_usecount;
+
+               if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
+                       RETURN(ERR_PTR(-EPERM));
+
+               /* Get the openhandle of the file */
+               rc = -EBUSY;
+               mutex_lock(&lli->lli_och_mutex);
+               if (fd->fd_lease_och != NULL) {
+                       mutex_unlock(&lli->lli_och_mutex);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               if (fd->fd_och == NULL) {
+                       if (file->f_mode & FMODE_WRITE) {
+                               LASSERT(lli->lli_mds_write_och != NULL);
+                               och_p = &lli->lli_mds_write_och;
+                               och_usecount = &lli->lli_open_fd_write_count;
+                       } else {
+                               LASSERT(lli->lli_mds_read_och != NULL);
+                               och_p = &lli->lli_mds_read_och;
+                               och_usecount = &lli->lli_open_fd_read_count;
+                       }
+                       if (*och_usecount == 1) {
+                               fd->fd_och = *och_p;
+                               *och_p = NULL;
+                               *och_usecount = 0;
+                               rc = 0;
+                       }
+               }
+               mutex_unlock(&lli->lli_och_mutex);
+               if (rc < 0) /* more than 1 opener */
+                       RETURN(ERR_PTR(rc));
+
+               LASSERT(fd->fd_och != NULL);
+               old_handle = fd->fd_och->och_fh;
+       }
+
+       OBD_ALLOC_PTR(och);
+       if (och == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
+                                       LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data))
+               GOTO(out, rc = PTR_ERR(op_data));
+
+       /* To tell the MDT this openhandle is from the same owner */
+       op_data->op_handle = old_handle;
+
+       it.it_flags = fmode | MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
+       rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
+                               ll_md_blocking_lease_ast,
+       /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
+        * it can be cancelled which may mislead applications that the lease is
+        * broken;
+        * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
+        * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
+        * doesn't deal with openhandle, so normal openhandle will be leaked. */
+                               LDLM_FL_NO_LRU | LDLM_FL_EXCL);
+       ll_finish_md_op_data(op_data);
+       if (req != NULL) {
+               ptlrpc_req_finished(req);
+               it_clear_disposition(&it, DISP_ENQ_COMPLETE);
+       }
+       if (rc < 0)
+               GOTO(out_release_it, rc);
+
+       if (it_disposition(&it, DISP_LOOKUP_NEG))
+               GOTO(out_release_it, rc = -ENOENT);
+
+       rc = it_open_error(DISP_OPEN_OPEN, &it);
+       if (rc)
+               GOTO(out_release_it, rc);
+
+       LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
+       ll_och_fill(sbi->ll_md_exp, &it, och);
+
+       if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
+               GOTO(out_close, rc = -EOPNOTSUPP);
+
+       /* already get lease, handle lease lock */
+       ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
+       if (it.d.lustre.it_lock_mode == 0 ||
+           it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) {
+               /* open lock must return for lease */
+               CERROR(DFID "lease granted but no open lock, %d/%Lu.\n",
+                       PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode,
+                       it.d.lustre.it_lock_bits);
+               GOTO(out_close, rc = -EPROTO);
+       }
+
+       ll_intent_release(&it);
+       RETURN(och);
+
+out_close:
+       rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och);
+       if (rc2)
+               CERROR("Close openhandle returned %d\n", rc2);
+
+       /* cancel open lock */
+       if (it.d.lustre.it_lock_mode != 0) {
+               ldlm_lock_decref_and_cancel(&och->och_lease_handle,
+                                               it.d.lustre.it_lock_mode);
+               it.d.lustre.it_lock_mode = 0;
+       }
+out_release_it:
+       ll_intent_release(&it);
+out:
+       OBD_FREE_PTR(och);
+       RETURN(ERR_PTR(rc));
+}
+EXPORT_SYMBOL(ll_lease_open);
+
+/**
+ * Release lease and close the file.
+ * It will check if the lease has ever broken.
+ */
+int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
+                       bool *lease_broken)
+{
+       struct ldlm_lock *lock;
+       bool cancelled = true;
+       int rc;
+       ENTRY;
+
+       lock = ldlm_handle2lock(&och->och_lease_handle);
+       if (lock != NULL) {
+               lock_res_and_lock(lock);
+               cancelled = ldlm_is_cancel(lock);
+               unlock_res_and_lock(lock);
+               ldlm_lock_put(lock);
+       }
+
+       CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
+               PFID(&ll_i2info(inode)->lli_fid), cancelled);
+
+       if (!cancelled)
+               ldlm_cli_cancel(&och->och_lease_handle, 0);
+       if (lease_broken != NULL)
+               *lease_broken = cancelled;
+
+       rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(ll_lease_close);
+
 /* Fills the obdo with the attributes for the lsm */
 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
                           struct obd_capa *capa, struct obdo *obdo,
@@ -927,7 +1132,7 @@ out:
         cl_io_fini(env, io);
        /* If any bit been read/written (result != 0), we just return
         * short read/write instead of restart io. */
-       if (result == 0 && io->ci_need_restart) {
+       if ((result == 0 || result == -ENODATA) && io->ci_need_restart) {
                CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
                       iot == CIT_READ ? "read" : "write",
                       file->f_dentry->d_name.name, *ppos, count);
@@ -1620,8 +1825,7 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
         if (!och)
                 GOTO(out, rc = -ENOMEM);
 
-        ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
-                    ll_i2info(inode), it, och);
+       ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
 
         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
                                        inode, och);
@@ -2218,6 +2422,90 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
                OBD_FREE_PTR(hca);
                RETURN(rc);
        }
+       case LL_IOC_SET_LEASE: {
+               struct ll_inode_info *lli = ll_i2info(inode);
+               struct obd_client_handle *och = NULL;
+               bool lease_broken;
+               fmode_t mode = 0;
+
+               switch (arg) {
+               case F_WRLCK:
+                       if (!(file->f_mode & FMODE_WRITE))
+                               RETURN(-EPERM);
+                       mode = FMODE_WRITE;
+                       break;
+               case F_RDLCK:
+                       if (!(file->f_mode & FMODE_READ))
+                               RETURN(-EPERM);
+                       mode = FMODE_READ;
+                       break;
+               case F_UNLCK:
+                       mutex_lock(&lli->lli_och_mutex);
+                       if (fd->fd_lease_och != NULL) {
+                               och = fd->fd_lease_och;
+                               fd->fd_lease_och = NULL;
+                       }
+                       mutex_unlock(&lli->lli_och_mutex);
+
+                       if (och != NULL) {
+                               mode = och->och_flags &(FMODE_READ|FMODE_WRITE);
+                               rc = ll_lease_close(och, inode, &lease_broken);
+                               if (rc == 0 && lease_broken)
+                                       mode = 0;
+                       } else {
+                               rc = -ENOLCK;
+                       }
+
+                       /* return the type of lease or error */
+                       RETURN(rc < 0 ? rc : (int)mode);
+               default:
+                       RETURN(-EINVAL);
+               }
+
+               CDEBUG(D_INODE, "Set lease with mode %d\n", mode);
+
+               /* apply for lease */
+               och = ll_lease_open(inode, file, mode);
+               if (IS_ERR(och))
+                       RETURN(PTR_ERR(och));
+
+               rc = 0;
+               mutex_lock(&lli->lli_och_mutex);
+               if (fd->fd_lease_och == NULL) {
+                       fd->fd_lease_och = och;
+                       och = NULL;
+               }
+               mutex_unlock(&lli->lli_och_mutex);
+               if (och != NULL) {
+                       /* impossible now that only excl is supported for now */
+                       ll_lease_close(och, inode, &lease_broken);
+                       rc = -EBUSY;
+               }
+               RETURN(rc);
+       }
+       case LL_IOC_GET_LEASE: {
+               struct ll_inode_info *lli = ll_i2info(inode);
+               struct ldlm_lock *lock = NULL;
+
+               rc = 0;
+               mutex_lock(&lli->lli_och_mutex);
+               if (fd->fd_lease_och != NULL) {
+                       struct obd_client_handle *och = fd->fd_lease_och;
+
+                       lock = ldlm_handle2lock(&och->och_lease_handle);
+                       if (lock != NULL) {
+                               lock_res_and_lock(lock);
+                               if (!ldlm_is_cancel(lock))
+                                       rc = och->och_flags &
+                                               (FMODE_READ | FMODE_WRITE);
+                               unlock_res_and_lock(lock);
+                               ldlm_lock_put(lock);
+                       }
+               }
+               mutex_unlock(&lli->lli_och_mutex);
+
+               RETURN(rc);
+       }
        default: {
                int err;
 
@@ -2801,13 +3089,13 @@ out:
 }
 
 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
-                           __u64 ibits)
+                          __u64 ibits)
 {
-        struct inode *inode = dentry->d_inode;
-        int rc;
-        ENTRY;
+       struct inode    *inode = dentry->d_inode;
+       int              rc;
+       ENTRY;
 
-        rc = __ll_inode_revalidate_it(dentry, it, ibits);
+       rc = __ll_inode_revalidate_it(dentry, it, ibits);
        if (rc != 0)
                RETURN(rc);
 
@@ -2817,9 +3105,17 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
                LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
        } else {
-               rc = ll_glimpse_size(inode);
+               /* In case of restore, the MDT has the right size and has
+                * already send it back without granting the layout lock,
+                * inode is up-to-date so glimpse is useless.
+                * Also to glimpse we need the layout, in case of a running
+                * restore the MDT holds the layout lock so the glimpse will
+                * block up to the end of restore (getattr will block)
+                */
+               if (!(ll_i2info(inode)->lli_flags & LLIF_FILE_RESTORING))
+                       rc = ll_glimpse_size(inode);
        }
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
@@ -3305,6 +3601,7 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        unlock_res_and_lock(lock);
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
+
        if (lvb_ready || !reconf) {
                rc = -ENODATA;
                if (lvb_ready) {
@@ -3484,3 +3781,32 @@ again:
 
        RETURN(rc);
 }
+
+/**
+ *  This function send a restore request to the MDT
+ */
+int ll_layout_restore(struct inode *inode)
+{
+       struct hsm_user_request *hur;
+       int                      len, rc;
+       ENTRY;
+
+       len = sizeof(struct hsm_user_request) +
+             sizeof(struct hsm_user_item);
+       OBD_ALLOC(hur, len);
+       if (hur == NULL)
+               RETURN(-ENOMEM);
+
+       hur->hur_request.hr_action = HUA_RESTORE;
+       hur->hur_request.hr_archive_id = 0;
+       hur->hur_request.hr_flags = 0;
+       memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
+              sizeof(hur->hur_user_item[0].hui_fid));
+       hur->hur_user_item[0].hui_extent.length = -1;
+       hur->hur_request.hr_itemcount = 1;
+       rc = obd_iocontrol(LL_IOC_HSM_REQUEST, cl_i2sbi(inode)->ll_md_exp,
+                          len, hur, NULL);
+       OBD_FREE(hur, len);
+       RETURN(rc);
+}
+