X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fmds_open.c;h=e76ff949a2354c0ee8e04a9028bec95c3ed91572;hp=fcbf81565b3b5f58aa254f9325b9dcb7261718d6;hb=87c86d444e61e38d6454bba5700ba966dc1ac83d;hpb=7ece95c40263566b0d8a9f9221d9a66605fb37a8 diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index fcbf815..e76ff94 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -247,35 +247,34 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry, struct mds_obd *mds = mds_req2mds(req); struct mds_file_data *mfd; struct mds_body *body; - int error; + int rc = 0; ENTRY; mfd = mds_mfd_new(); if (mfd == NULL) { CERROR("mds: out of memory\n"); - GOTO(cleanup_dentry, error = -ENOMEM); + GOTO(cleanup_dentry, rc = -ENOMEM); } body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body)); if (flags & FMODE_WRITE) { /* FIXME: in recovery, need to pass old epoch here */ - error = mds_get_write_access(mds, dentry->d_inode, 0); - if (error) - GOTO(cleanup_mfd, error); + rc = mds_get_write_access(mds, dentry->d_inode, 0); + if (rc) + GOTO(cleanup_mfd, rc); #ifdef IFILTERDATA_ACTUALLY_USED body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch; -#endif /*IFILTERDATA_ACTUALLY_USED*/ +#endif } else if (flags & FMODE_EXEC) { - error = mds_deny_write_access(mds, dentry->d_inode); - if (error) - GOTO(cleanup_mfd, error); + rc = mds_deny_write_access(mds, dentry->d_inode); + if (rc) + GOTO(cleanup_mfd, rc); } dget(dentry); - /* Mark the file as open to handle open-unlink. */ - + /* mark the file as open to handle open-unlink. */ DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode); mds_orphan_open_inc(dentry->d_inode); UP_WRITE_I_ALLOC_SEM(dentry->d_inode); @@ -290,41 +289,42 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry, mds_mfd_put(mfd); body->handle.cookie = mfd->mfd_handle.h_cookie; - RETURN(mfd); - cleanup_mfd: mds_mfd_put(mfd); mds_mfd_destroy(mfd); cleanup_dentry: - return ERR_PTR(error); + return ERR_PTR(rc); } -static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, - struct lov_desc *desc) +static inline void +mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, + struct lov_desc *desc) { int i; + for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) { ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] = le64_to_cpu(lmm->lmm_objects[i].l_object_id); } } -/* Must be called with i_sem held */ -static int mds_create_objects(struct ptlrpc_request *req, int offset, - struct mds_update_record *rec, - struct mds_obd *mds, struct obd_device *obd, - struct dentry *dchild, void **handle, - obd_id **ids) +/* must be called with i_sem held */ +int +mds_create_objects(struct obd_device *obd, struct ptlrpc_request *req, + int offset, struct mds_update_record *rec, + struct dentry *dchild, void **handle, + obd_id **ids) { - struct obdo *oa = NULL; + struct inode *inode = dchild->d_inode; + struct mds_obd *mds = &obd->u.mds; struct obd_trans_info oti = { 0 }; - struct mds_body *body; struct lov_stripe_md *lsm = NULL; struct lov_mds_md *lmm = NULL; - struct inode *inode = dchild->d_inode; - void *lmm_buf; int rc, lmm_bufsize, lmm_size; + struct obdo *oa = NULL; + struct mds_body *body; + void *lmm_buf; ENTRY; if (rec->ur_flags & MDS_OPEN_DELAY_CREATE || @@ -342,7 +342,7 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, if (*ids == NULL) RETURN(-ENOMEM); oti.oti_objid = *ids; - + /* replay case */ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { LASSERT(id_ino(rec->ur_id2)); @@ -352,19 +352,20 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, LASSERT(lmm); if (*handle == NULL) - *handle = fsfilt_start(obd,inode,FSFILT_OP_CREATE,NULL); + *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL); if (IS_ERR(*handle)) { rc = PTR_ERR(*handle); *handle = NULL; - GOTO(out_oa, rc); + RETURN(rc); } mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc); lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0); lmm_bufsize = req->rq_repmsg->buflens[offset]; - LASSERT(lmm_buf); + LASSERT(lmm_buf != NULL); LASSERT(lmm_bufsize >= lmm_size); + memcpy(lmm_buf, lmm, lmm_size); rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size, EA_LOV); @@ -374,23 +375,23 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, } if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO)) - GOTO(out_ids, rc = -ENOMEM); + RETURN(-ENOMEM); oa = obdo_alloc(); if (oa == NULL) - GOTO(out_ids, rc = -ENOMEM); + RETURN(-ENOMEM); oa->o_mode = S_IFREG | 0600; oa->o_id = inode->i_ino; oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; oa->o_generation = inode->i_generation; oa->o_uid = 0; /* must have 0 uid / gid on OST */ oa->o_gid = 0; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE | - OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE | + OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; oa->o_size = 0; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME| - OBD_MD_FLCTIME); + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) { /* check if things like lfs setstripe are sending us the ea */ @@ -415,15 +416,19 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, OBD_FREE(lmm, mds->mds_max_mdsize); if (rc) GOTO(out_oa, rc); - } + } + LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS); - rc = obd_create(mds->mds_dt_exp, oa, &lsm, &oti); + oti.oti_flags |= OBD_MODE_CROW; + rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti); + if (rc) { int level = D_ERROR; if (rc == -ENOSPC) level = D_INODE; - CDEBUG(level, "error creating objects for " - "inode %lu: rc = %d\n", + CDEBUG((rc == -ENOSPC ? D_INODE : D_ERROR), + "error creating objects for " + "inode %lu: rc = %d\n", inode->i_ino, rc); if (rc > 0) { CERROR("obd_create returned invalid " @@ -435,16 +440,17 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, } else { rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_dt_exp, 0, &lsm, rec->ur_eadata); - if (rc) { + if (rc) GOTO(out_oa, rc); - } + lsm->lsm_object_id = oa->o_id; lsm->lsm_object_gr = oa->o_gr; } if (inode->i_size) { oa->o_size = inode->i_size; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME| - OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE); + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE); + rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti); if (rc) { CERROR("error setting attrs for inode %lu: rc %d\n", @@ -465,7 +471,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, rc = obd_packmd(mds->mds_dt_exp, &lmm, lsm); if (!id_ino(rec->ur_id2)) obd_free_memmd(mds->mds_dt_exp, &lsm); - LASSERT(rc >= 0); + if (rc < 0) { + CERROR("cannot pack lsm, err = %d\n", rc); + GOTO(out_oa, rc); + } + lmm_size = rc; body->eadatasize = rc; @@ -474,7 +484,7 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, if (IS_ERR(*handle)) { rc = PTR_ERR(*handle); *handle = NULL; - GOTO(out_ids, rc); + GOTO(out_oa, rc); } rc = fsfilt_set_md(obd, inode, *handle, lmm, @@ -487,19 +497,65 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, memcpy(lmm_buf, lmm, lmm_size); obd_free_diskmd(mds->mds_dt_exp, &lmm); - out_oa: +out_oa: oti_free_cookies(&oti); obdo_free(oa); - out_ids: - if (rc) { - OBD_FREE(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids)); - *ids = NULL; - } - if(lsm) + + if (lsm) obd_free_memmd(mds->mds_dt_exp, &lsm); RETURN(rc); } +int +mds_destroy_object(struct obd_device *obd, + struct inode *inode, int async) +{ + struct mds_obd *mds = &obd->u.mds; + struct lov_mds_md *lmm = NULL; + int rc, lmm_size; + ENTRY; + + LASSERT(inode != NULL); + + if (inode->i_nlink != 0) { + CDEBUG(D_INODE, "attempt to destroy OSS object when " + "i_nlink == %d\n", (int)inode->i_nlink); + RETURN(0); + } + + OBD_ALLOC(lmm, mds->mds_max_mdsize); + if (lmm == NULL) + RETURN(-ENOMEM); + + lmm_size = mds->mds_max_mdsize; + rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0); + if (rc < 0) { + CERROR("no stripe info for %lu/%lu inode\n", + (unsigned long)inode->i_ino, + (unsigned long)inode->i_generation); + GOTO(out_free_lmm, rc); + } + + if (rc > 0) { + /* asynchronously unlink objecect on OSS */ + rc = mds_unlink_object(mds, inode, lmm, lmm_size, + NULL, 0, async); + if (rc) { + CERROR("error unlinking object on OSS, " + "err %d\n", rc); + GOTO(out_free_lmm, rc); + } + } else { + CDEBUG(D_INODE, "no stripping info found for inode " + "%lu/%lu\n", (unsigned long)inode->i_ino, + (unsigned long)inode->i_generation); + } + EXIT; +out_free_lmm: + OBD_FREE(lmm, mds->mds_max_mdsize); + return rc; +} + static void reconstruct_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh) @@ -646,10 +702,10 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, struct mds_body *body, int flags, void **handle, struct mds_update_record *rec, struct ldlm_reply *rep) { - struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; + struct mds_obd *mds = mds_req2mds(req); struct mds_file_data *mfd = NULL; - obd_id *ids = NULL; /* object IDs created */ + obd_id *ids = NULL; unsigned mode; int rc = 0; ENTRY; @@ -657,6 +713,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, /* atomically create objects if necessary */ down(&dchild->d_inode->i_sem); mode = dchild->d_inode->i_mode; + if ((S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) || (S_ISDIR(mode) && !(body->valid & OBD_MD_FLDIREA))) { rc = mds_pack_md(obd, req->rq_repmsg, 2, body, @@ -666,23 +723,25 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, RETURN(rc); } } + if (rec != NULL) { - /* no EA: create objects */ if ((body->valid & OBD_MD_FLEASIZE) && (rec->ur_flags & MDS_OPEN_HAS_EA)) { up(&dchild->d_inode->i_sem); RETURN(-EEXIST); } + if (!(body->valid & OBD_MD_FLEASIZE)) { /* no EA: create objects */ - rc = mds_create_objects(req, 2, rec, mds, obd, + rc = mds_create_objects(obd, req, 2, rec, dchild, handle, &ids); if (rc) { - CERROR("mds_create_objects: rc = %d\n", rc); + CERROR("mds_create_object: rc = %d\n", rc); up(&dchild->d_inode->i_sem); RETURN(rc); } } + if (S_ISREG(dchild->d_inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) { rc = mds_revalidate_lov_ea(obd, dchild->d_inode, @@ -696,6 +755,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, } } } + rc = mds_pack_acl(obd, req->rq_repmsg, 3, body, dchild->d_inode); if (rc < 0) { CERROR("mds_pack_acl: rc = %d\n", rc); @@ -709,6 +769,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME | OBD_MD_FLMTIME); } + up(&dchild->d_inode->i_sem); intent_set_disposition(rep, DISP_OPEN_OPEN); @@ -718,12 +779,12 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd, mfd->mfd_handle.h_cookie); + if (ids != NULL) { mds_dt_update_objids(obd, ids); OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count); } - //if (rc) - // mds_mfd_destroy(mfd); + RETURN(rc); } @@ -899,8 +960,8 @@ int mds_open(struct mds_update_record *rec, int offset, LASSERT(id_ino(rec->ur_id2)); - rc = mds_open_by_id(req, rec->ur_id2, body, rec->ur_flags, - rec, rep); + rc = mds_open_by_id(req, rec->ur_id2, body, + rec->ur_flags, rec, rep); if (rc != -ENOENT) { mds_body_do_reverse_map(med, body); RETURN(rc); @@ -999,12 +1060,17 @@ got_child: if (dchild->d_flags & DCACHE_CROSS_REF) { CDEBUG(D_OTHER, "cross reference: "DLID4"\n", OLID4(rec->ur_id1)); + LASSERT(rec->ur_namelen > 1); /* we're gonna acquire LOOKUP lock on the child, * but we have already locked parent and our order * may conflict with enqueue_order_locks(). so, * drop parent lock and acquire both the locks in * common order. bug 6190 */ +#ifdef S_PDIROPS + if (parent_lockh[1].cookie != 0) + ldlm_lock_decref(parent_lockh + 1, update_mode); +#endif ldlm_lock_decref(parent_lockh, parent_mode); l_dput(dchild); l_dput(dparent); @@ -1021,14 +1087,19 @@ got_child: if (rc) GOTO(cleanup, rc); +#ifdef S_PDIROPS + if (parent_lockh[1].cookie != 0) + ldlm_lock_decref(parent_lockh + 1, update_mode); +#endif + ldlm_lock_decref(parent_lockh, LCK_PR); + if (dchild->d_inode || !(dchild->d_flags & DCACHE_CROSS_REF)) { - /* wow! someone unlink and create new one yet */ - CDEBUG(D_OTHER, "nice race, repeat lookup\n"); - ldlm_lock_decref(parent_lockh, parent_mode); - ldlm_lock_decref(child_lockh, LCK_PR); + CDEBUG(D_OTHER, "race: name changed (%p)\n", + dchild->d_inode); + if (dchild->d_inode) + ldlm_lock_decref(child_lockh, LCK_PR); l_dput(dchild); l_dput(dparent); - LASSERT(rec->ur_namelen > 1); GOTO(restart, rc); } @@ -1036,11 +1107,6 @@ got_child: intent_set_disposition(rep, DISP_LOOKUP_POS); intent_set_disposition(rep, DISP_LOOKUP_EXECD); -#ifdef S_PDIROPS - if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, update_mode); -#endif - ldlm_lock_decref(parent_lockh, parent_mode); if (mea) OBD_FREE(mea, mea_size); l_dput(dchild); @@ -1143,7 +1209,6 @@ got_child: MD_COUNTER_INCREMENT(obd, create); } - down(&dchild->d_inode->i_sem); if (ino) { rc = mds_update_inode_sid(obd, dchild->d_inode, handle, rec->ur_id2); @@ -1165,8 +1230,7 @@ got_child: "rc = %d\n", rc); } } - up(&dchild->d_inode->i_sem); - + if (!(rec->ur_flags & O_EXCL)) { /* bug 3313 */ rc = fsfilt_commit(obd, dchild->d_inode->i_sb, dchild->d_inode, handle, @@ -1236,7 +1300,9 @@ got_child: obd_fail_loc = OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE; GOTO(cleanup, rc = -EAGAIN); } - + +#warning "disable opencache lock for CMD2" +#if 0 /* Obtain OPEN lock as well */ policy.l_inodebits.bits |= MDS_INODELOCK_OPEN; @@ -1250,22 +1316,10 @@ got_child: child_mode = LCK_CR; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { - struct lustre_id sid; - - down(&dchild->d_inode->i_sem); - rc = mds_read_inode_sid(obd, dchild->d_inode, &sid); - up(&dchild->d_inode->i_sem); - if (rc) { - CERROR("Can't read inode self id, " - "inode %lu, rc %d\n", - dchild->d_inode->i_ino, rc); - GOTO(cleanup, rc); - } - /* In case of replay we do not get a lock assuming that the caller has it already */ - child_res_id.name[0] = id_fid(&sid); - child_res_id.name[1] = id_group(&sid); + child_res_id.name[0] = id_fid(&body->id1); + child_res_id.name[1] = id_group(&body->id1); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id, LDLM_IBITS, &policy, @@ -1277,6 +1331,7 @@ got_child: cleanup_phase = 3; } +#endif /* Step 5: mds_open it */ rc = mds_finish_open(req, dchild, body, rec->ur_flags, &handle, @@ -1394,12 +1449,6 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset, int stripe_count = 0; LASSERT(rc == 0); /* mds_put_write_access must have succeeded */ - if (obd->obd_recovering) { - CDEBUG(D_HA, "not remove orphan %s until recovery" - " is over\n", idname); - GOTO(out, rc); - } - CDEBUG(D_HA, "destroying orphan object %s\n", idname); if ((S_ISREG(inode->i_mode) && inode->i_nlink != 1) || @@ -1418,7 +1467,10 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset, idlen); if (IS_ERR(pending_child)) GOTO(cleanup, rc = PTR_ERR(pending_child)); - LASSERT(pending_child->d_inode != NULL); + if (pending_child->d_inode == NULL) { + CERROR("orphan %s has been removed\n", idname); + GOTO(cleanup, rc = 0); + } cleanup_phase = 2; /* dput(pending_child) when finished */ if (S_ISDIR(pending_child->d_inode->i_mode)) { @@ -1458,6 +1510,13 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset, req->rq_repmsg->buflens[2], &lcl) > 0) { reply_body->valid |= OBD_MD_FLCOOKIE; } + + rc = mds_destroy_object(obd, inode, 1); + if (rc) { + CERROR("cannot destroy OSS object on close, err %d\n", + rc); + rc = 0; + } goto out; /* Don't bother updating attrs on unlinked inode */ } @@ -1575,7 +1634,7 @@ int mds_close(struct ptlrpc_request *req, int offset) } if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - DEBUG_REQ(D_HA, req, "close replay\n"); + DEBUG_REQ(D_HA, req, "close replay"); memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0), lustre_msg_buf(req->rq_reqmsg, offset + 1, 0), req->rq_repmsg->buflens[2]);