From bad3011de38712fe49dd67c7bd9090588368cf3b Mon Sep 17 00:00:00 2001 From: phil Date: Thu, 27 Nov 2003 06:01:11 +0000 Subject: [PATCH] Innocent beginnings of the b_eq merge: the liblustre directory, which accounts for almost half of the patch and doesn't build in kernelspace. --- lustre/liblustre/file.c | 568 ++++++++-------- lustre/liblustre/genlib.sh | 55 ++ lustre/liblustre/llite_lib.c | 403 +++++++++--- lustre/liblustre/llite_lib.h | 224 ++++++- lustre/liblustre/lltest.c | 425 +++++++++--- lustre/liblustre/namei.c | 634 ++++++++++++++++++ lustre/liblustre/rw.c | 915 +++++++++++++++++--------- lustre/liblustre/super.c | 1485 +++++++++++++++++++++++++++++++++--------- 8 files changed, 3632 insertions(+), 1077 deletions(-) create mode 100755 lustre/liblustre/genlib.sh create mode 100644 lustre/liblustre/namei.c diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index a54d7dd..58339c8 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -46,124 +45,51 @@ void llu_prepare_mdc_op_data(struct mdc_op_data *data, int namelen, int mode) { - struct llu_inode_info *lli1, *lli2; - LASSERT(i1); - - lli1 = llu_i2info(i1); - data->ino1 = lli1->lli_st_ino; - data->gen1 = lli1->lli_st_generation; - data->typ1 = lli1->lli_st_mode & S_IFMT; - data->gid1 = lli1->lli_st_gid; + + ll_i2uctxt(&data->ctxt, i1, i2); + ll_inode2fid(&data->fid1, i1); if (i2) { - lli2 = llu_i2info(i2); - data->ino2 = lli2->lli_st_ino; - data->gen2 = lli2->lli_st_generation; - data->typ2 = lli2->lli_st_mode & S_IFMT; - data->gid2 = lli2->lli_st_gid; - } else - data->ino2 = 0; + ll_inode2fid(&data->fid2, i2); + } data->name = name; data->namelen = namelen; - data->mode = mode; + data->create_mode = mode; data->mod_time = CURRENT_TIME; } -static struct inode *llu_create_node(struct inode *dir, const char *name, - int namelen, const void *data, int datalen, - int mode, __u64 extra, - struct lookup_intent *it) +void obdo_refresh_inode(struct inode *dst, + struct obdo *src, + obd_flag valid) { - struct inode *inode; - struct ptlrpc_request *request = NULL; - struct mds_body *body; - time_t time = 123456;//time(NULL); - struct llu_sb_info *sbi = llu_i2sbi(dir); - - if (it && it->it_disposition) { - LBUG(); -#if 0 - ll_invalidate_inode_pages(dir); -#endif - request = it->it_data; - body = lustre_msg_buf(request->rq_repmsg, 1, sizeof(*body)); - } else { - struct mdc_op_data op_data; - struct llu_inode_info *lli_dir = llu_i2info(dir); - int gid = current->fsgid; - int rc; - - if (lli_dir->lli_st_mode & S_ISGID) { - gid = lli_dir->lli_st_gid; - if (S_ISDIR(mode)) - mode |= S_ISGID; - } - - llu_prepare_mdc_op_data(&op_data, dir, NULL, name, namelen, 0); - rc = mdc_create(&sbi->ll_mdc_conn, &op_data, - data, datalen, mode, current->fsuid, gid, - extra, &request); - if (rc) { - inode = (struct inode*)rc; - goto out; - } - body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); - } - - inode = llu_new_inode(dir->i_fs, body->ino, body->mode); - if (!inode) { - /* FIXME more cleanup needed? */ - goto out; - } - - llu_update_inode(inode, body, NULL); - - if (it && it->it_disposition) { - /* We asked for a lock on the directory, but were - * granted a lock on the inode. Since we finally have - * an inode pointer, stuff it in the lock. */ -#if 0 - ll_mdc_lock_set_inode((struct lustre_handle *)it->it_lock_handle, - inode); -#endif - } - - out: - ptlrpc_req_finished(request); - return inode; + struct llu_inode_info *lli = llu_i2info(dst); + valid &= src->o_valid; + + if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) + CDEBUG(D_INODE, "valid %x, cur time %lu/%lu, new %lu/%lu\n", + src->o_valid, LTIME_S(lli->lli_st_mtime), + LTIME_S(lli->lli_st_ctime), + (long)src->o_mtime, (long)src->o_ctime); + + if (valid & OBD_MD_FLATIME && src->o_atime > LTIME_S(lli->lli_st_atime)) + LTIME_S(lli->lli_st_atime) = src->o_atime; + if (valid & OBD_MD_FLMTIME && src->o_mtime > LTIME_S(lli->lli_st_mtime)) + LTIME_S(lli->lli_st_mtime) = src->o_mtime; + if (valid & OBD_MD_FLCTIME && src->o_ctime > LTIME_S(lli->lli_st_ctime)) + LTIME_S(lli->lli_st_ctime) = src->o_ctime; + if (valid & OBD_MD_FLSIZE && src->o_size > lli->lli_st_size) + lli->lli_st_size = src->o_size; + /* optimum IO size */ + if (valid & OBD_MD_FLBLKSZ) + lli->lli_st_blksize = src->o_blksize; + /* allocation of space */ + if (valid & OBD_MD_FLBLOCKS && src->o_blocks > lli->lli_st_blocks) + lli->lli_st_blocks = src->o_blocks; } -int llu_create(struct inode *dir, struct pnode_base *pnode, int mode) -{ - struct inode *inode; #if 0 - int rc = 0; - - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu,intent=%s\n", - dentry->d_name.name, dir->i_ino, LL_IT2STR(dentry->d_it)); - - it = dentry->d_it; - - rc = ll_it_open_error(DISP_OPEN_CREATE, it); - if (rc) { - LL_GET_INTENT(dentry, it); - ptlrpc_req_finished(it->it_data); - RETURN(rc); - } -#endif - inode = llu_create_node(dir, pnode->pb_name.name, pnode->pb_name.len, - NULL, 0, mode, 0, NULL); - - if (IS_ERR(inode)) - RETURN(PTR_ERR(inode)); - - pnode->pb_ino = inode; - - return 0; -} - static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, struct lov_stripe_md *lsm) { @@ -173,6 +99,7 @@ static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, struct obdo *oa; struct iattr iattr; struct mdc_op_data op_data; + struct obd_trans_info oti = { 0 }; int rc, err, lmm_size = 0;; ENTRY; @@ -180,18 +107,24 @@ static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, if (!oa) RETURN(-ENOMEM); + LASSERT(S_ISREG(inode->i_mode)); oa->o_mode = S_IFREG | 0600; oa->o_id = lli->lli_st_ino; + oa->o_generation = lli->lli_st_generation; /* Keep these 0 for now, because chown/chgrp does not change the * ownership on the OST, and we don't want to allow BA OST NFS * users to access these objects by mistake. */ oa->o_uid = 0; oa->o_gid = 0; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE | - OBD_MD_FLUID | OBD_MD_FLGID; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE | + OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID; - rc = obd_create(conn, oa, &lsm, NULL); + obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME| + OBD_MD_FLCTIME | + (llu_i2info(inode)->lli_st_size ? OBD_MD_FLSIZE : 0)); + + rc = obd_create(conn, oa, &lsm, &oti); if (rc) { CERROR("error creating objects for inode %lu: rc = %d\n", lli->lli_st_ino, rc); @@ -201,6 +134,7 @@ static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, } GOTO(out_oa, rc); } + obdo_refresh_inode(inode, oa, OBD_MD_FLBLKSZ); LASSERT(lsm && lsm->lsm_object_id); rc = obd_packmd(conn, &lmm, lsm); @@ -216,7 +150,8 @@ static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, llu_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); rc = mdc_setattr(&llu_i2sbi(inode)->ll_mdc_conn, &op_data, - &iattr, lmm, lmm_size, &req); + &iattr, lmm, lmm_size, oti.oti_logcookies, + oti.oti_numcookies * sizeof(oti.oti_onecookie), &req); ptlrpc_req_finished(req); obd_free_diskmd(conn, &lmm); @@ -230,16 +165,19 @@ static int llu_create_obj(struct lustre_handle *conn, struct inode *inode, GOTO(out_destroy, rc); } lli->lli_smd = lsm; + lli->lli_maxbytes = lsm->lsm_maxbytes; EXIT; out_oa: + oti_free_cookies(&oti); obdo_free(oa); return rc; out_destroy: - obdo_from_inode(oa, inode, OBD_MD_FLTYPE); oa->o_id = lsm->lsm_object_id; - oa->o_valid |= OBD_MD_FLID; + oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oa, inode, OBD_MD_FLTYPE); + err = obd_destroy(conn, oa, lsm, NULL); obd_free_memmd(conn, &lsm); if (err) { @@ -248,33 +186,42 @@ out_destroy: } goto out_oa; } +#endif -/* FIXME currently no "it" passed in */ static int llu_local_open(struct llu_inode_info *lli, struct lookup_intent *it) { + struct ptlrpc_request *req = it->d.lustre.it_data; struct ll_file_data *fd; -#if 0 - struct ptlrpc_request *req = it->it_data; - struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + struct mds_body *body; ENTRY; -#endif + + body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body)); + LASSERT (body != NULL); /* reply already checked out */ + LASSERT_REPSWABBED (req, 1); /* and swabbed down */ + + /* already opened? */ + if (lli->lli_open_count++) + RETURN(0); + LASSERT(!lli->lli_file_data); - fd = malloc(sizeof(struct ll_file_data)); + OBD_ALLOC(fd, sizeof(*fd)); /* We can't handle this well without reorganizing ll_file_open and * ll_mdc_close, so don't even try right now. */ LASSERT(fd != NULL); memset(fd, 0, sizeof(*fd)); -#if 0 + memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle)); - fd->fd_mds_och.och_req = it->it_data; -#endif + fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC; lli->lli_file_data = fd; + mdc_set_open_replay_data(&fd->fd_mds_och, it->d.lustre.it_data); + RETURN(0); } +#if 0 static int llu_osc_open(struct lustre_handle *conn, struct inode *inode, struct lov_stripe_md *lsm) { @@ -294,7 +241,7 @@ static int llu_osc_open(struct lustre_handle *conn, struct inode *inode, if (rc) GOTO(out, rc); -// file->f_flags &= ~O_LOV_DELAY_CREATE; + /* file->f_flags &= ~O_LOV_DELAY_CREATE; */ obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME); @@ -303,40 +250,55 @@ out: obdo_free(oa); return rc; } +#endif + -static int llu_file_open(struct inode *inode) +int llu_iop_open(struct pnode *pnode, int flags, mode_t mode) { -#if 0 - struct llu_sb_info *sbi = llu_i2sbi(inode); -#endif + struct inode *inode = pnode->p_base->pb_ino; struct llu_inode_info *lli = llu_i2info(inode); - struct lustre_handle *conn = llu_i2obdconn(inode); + struct ll_file_data *fd; + struct ptlrpc_request *request; struct lookup_intent *it; struct lov_stripe_md *lsm; int rc = 0; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", lli->lli_st_ino); + LL_GET_INTENT(inode, it); + if (!it->d.lustre.it_disposition) { #if 0 - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino); - LL_GET_INTENT(file->f_dentry, it); - rc = ll_it_open_error(DISP_OPEN_OPEN, it); - if (rc) - RETURN(rc); + struct lookup_intent oit = { .it_op = IT_OPEN, + .it_flags = file->f_flags }; + it = &oit; + rc = ll_intent_file_open(file, NULL, 0, it); + if (rc) + GOTO(out_release, rc); #endif + CERROR("fixme!!\n"); + } + + rc = it_open_error(DISP_OPEN_OPEN, it); + if (rc) + GOTO(out_release, rc); + rc = llu_local_open(lli, it); if (rc) LBUG(); -#if 0 - mdc_set_open_replay_data(&((struct ll_file_data *) - file->private_data)->fd_mds_och); -#endif + + if (!S_ISREG(lli->lli_st_mode)) + GOTO(out_release, rc = 0); + + fd = lli->lli_file_data; + lsm = lli->lli_smd; if (lsm == NULL) { -#if 0 - if (file->f_flags & O_LOV_DELAY_CREATE) { - CDEBUG(D_INODE, "delaying object creation\n"); - RETURN(0); + if (fd->fd_flags & O_LOV_DELAY_CREATE) { + CDEBUG(D_INODE, "object creation was delayed\n"); + GOTO(out_release, rc); } -#endif +#if 0 if (!lli->lli_smd) { rc = llu_create_obj(conn, inode, NULL); if (rc) @@ -346,132 +308,164 @@ static int llu_file_open(struct inode *inode) lli->lli_st_ino); } lsm = lli->lli_smd; +#endif } + fd->fd_flags &= ~O_LOV_DELAY_CREATE; - rc = llu_osc_open(conn, inode, lsm); - if (rc) - GOTO(out_close, rc); - RETURN(0); + out_release: + request = it->d.lustre.it_data; + ptlrpc_req_finished(request); - out_close: -// ll_mdc_close(&sbi->ll_mdc_conn, inode, file); - return rc; + it->it_op_release(it); + OBD_FREE(it, sizeof(*it)); + + RETURN(rc); } -int llu_iop_open(struct pnode *pnode, int flags, mode_t mode) +int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir) { - struct inode *dir = pnode->p_parent->p_base->pb_ino; + struct mds_body *body; + struct lov_mds_md *eadata; + struct lov_stripe_md *lsm = NULL; + struct obd_trans_info oti = { 0 }; + struct obdo *oa; int rc; - /* FIXME later we must add the ldlm here */ + ENTRY; - LASSERT(dir); + /* req is swabbed so this is safe */ + body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); - /* libsysio forgot to guarentee mode is valid XXX */ - mode |= S_IFREG; + if (!(body->valid & OBD_MD_FLEASIZE)) + RETURN(0); - if (!pnode->p_base->pb_ino) { - rc = llu_create(dir, pnode->p_base, mode); - if (rc) - return rc; + if (body->eadatasize == 0) { + CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n"); + GOTO(out, rc = -EPROTO); } - LASSERT(pnode->p_base->pb_ino); - return llu_file_open(pnode->p_base->pb_ino); -} + /* The MDS sent back the EA because we unlinked the last reference + * to this file. Use this EA to unlink the objects on the OST. + * It's opaque so we don't swab here; we leave it to obd_unpackmd() to + * check it is complete and sensible. */ + eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL); + LASSERT(eadata != NULL); + if (eadata == NULL) { + CERROR("Can't unpack MDS EA data\n"); + GOTO(out, rc = -EPROTO); + } + + rc = obd_unpackmd(llu_i2obdexp(dir), &lsm, eadata, body->eadatasize); + if (rc < 0) { + CERROR("obd_unpackmd: %d\n", rc); + GOTO(out, rc); + } + LASSERT(rc >= sizeof(*lsm)); + + oa = obdo_alloc(); + if (oa == NULL) + GOTO(out_free_memmd, rc = -ENOMEM); + oa->o_id = lsm->lsm_object_id; + oa->o_mode = body->mode & S_IFMT; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; + + if (body->valid & OBD_MD_FLCOOKIE) { + oa->o_valid |= OBD_MD_FLCOOKIE; + oti.oti_logcookies = + lustre_msg_buf(request->rq_repmsg, 2, + sizeof(struct llog_cookie) * + lsm->lsm_stripe_count); + if (oti.oti_logcookies == NULL) { + oa->o_valid &= ~OBD_MD_FLCOOKIE; + body->valid &= ~OBD_MD_FLCOOKIE; + } + } -static int llu_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode) + rc = obd_destroy(llu_i2obdexp(dir), oa, lsm, &oti); + obdo_free(oa); + if (rc) + CERROR("obd destroy objid 0x"LPX64" error %d\n", + lsm->lsm_object_id, rc); + out_free_memmd: + obd_free_memmd(llu_i2obdexp(dir), &lsm); + out: + return rc; +} + +int llu_mdc_close(struct obd_export *mdc_exp, struct inode *inode) { struct llu_inode_info *lli = llu_i2info(inode); struct ll_file_data *fd = lli->lli_file_data; struct ptlrpc_request *req = NULL; - unsigned long flags; - struct obd_import *imp; - int rc; - - /* FIXME add following code later FIXME */ -#if 0 - /* Complete the open request and remove it from replay list */ - rc = mdc_close(&ll_i2sbi(inode)->ll_mdc_conn, lli->lli_st_ino, - inode->i_mode, &fd->fd_mds_och.och_fh, &req); - if (rc) - CERROR("inode %lu close failed: rc = %d\n", - lli->lli_st_ino, rc); - - imp = fd->fd_mds_och.och_req->rq_import; - LASSERT(imp != NULL); - spin_lock_irqsave(&imp->imp_lock, flags); - - DEBUG_REQ(D_HA, fd->fd_mds_och.och_req, "matched open req %p", - fd->fd_mds_och.och_req); - - /* We held on to the request for replay until we saw a close for that - * file. Now that we've closed it, it gets replayed on the basis of - * its transno only. */ - spin_lock (&fd->fd_mds_och.och_req->rq_lock); - fd->fd_mds_och.och_req->rq_replay = 0; - spin_unlock (&fd->fd_mds_och.och_req->rq_lock); - - if (fd->fd_mds_och.och_req->rq_transno) { - /* This open created a file, so it needs replay as a - * normal transaction now. Our reference to it now - * effectively owned by the imp_replay_list, and it'll - * be committed just like other transno-having - * requests from here on out. */ - - /* We now retain this close request, so that it is - * replayed if the open is replayed. We duplicate the - * transno, so that we get freed at the right time, - * and rely on the difference in xid to keep - * everything ordered correctly. - * - * But! If this close was already given a transno - * (because it caused real unlinking of an - * open-unlinked file, f.e.), then we'll be ordered on - * the basis of that and we don't need to do anything - * magical here. */ - if (!req->rq_transno) { - req->rq_transno = fd->fd_mds_och.och_req->rq_transno; - ptlrpc_retain_replayable_request(req, imp); - } - spin_unlock_irqrestore(&imp->imp_lock, flags); + struct obd_client_handle *och = &fd->fd_mds_och; + struct obdo obdo; + int rc, valid; + ENTRY; - /* Should we free_committed now? we always free before - * replay, so it's probably a wash. We could check to - * see if the fd_req should already be committed, in - * which case we can avoid the whole retain_replayable - * dance. */ + valid = OBD_MD_FLID; + if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) + valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + + memset(&obdo, 0, sizeof(obdo)); + obdo.o_id = lli->lli_st_ino; + obdo.o_mode = lli->lli_st_mode; + obdo.o_size = lli->lli_st_size; + obdo.o_blocks = lli->lli_st_blocks; + if (0 /* ll_is_inode_dirty(inode) */) { + obdo.o_flags = MDS_BFLAG_UNCOMMITTED_WRITES; + valid |= OBD_MD_FLFLAGS; + } + obdo.o_valid = valid; + rc = mdc_close(mdc_exp, &obdo, och, &req); + if (rc == EAGAIN) { + /* We are the last writer, so the MDS has instructed us to get + * the file size and any write cookies, then close again. */ + //ll_queue_done_writing(inode); + rc = 0; + } else if (rc) { + CERROR("inode %lu close failed: rc = %d\n", lli->lli_st_ino, rc); } else { - /* No transno means that we can just drop our ref. */ - spin_unlock_irqrestore(&imp->imp_lock, flags); + rc = llu_objects_destroy(req, inode); + if (rc) + CERROR("inode %lu ll_objects destroy: rc = %d\n", + lli->lli_st_ino, rc); } - ptlrpc_req_finished(fd->fd_mds_och.och_req); - /* Do this after the fd_req->rq_transno check, because we don't want - * to bounce off zero references. */ + mdc_clear_open_replay_data(och); ptlrpc_req_finished(req); - fd->fd_mds_och.och_fh.cookie = DEAD_HANDLE_MAGIC; -#endif + och->och_fh.cookie = DEAD_HANDLE_MAGIC; lli->lli_file_data = NULL; - free(fd); + OBD_FREE(fd, sizeof(*fd)); - RETURN(-abs(rc)); + RETURN(rc); } -static int llu_file_release(struct inode *inode) +int llu_file_release(struct inode *inode) { + struct ll_file_data *fd; struct llu_sb_info *sbi = llu_i2sbi(inode); struct llu_inode_info *lli = llu_i2info(inode); - struct lov_stripe_md *lsm = lli->lli_smd; - struct ll_file_data *fd; - struct obdo oa; int rc = 0, rc2; + ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%lu\n", lli->lli_st_ino, + lli->lli_st_generation); + + /* FIXME need add this check later. how to find the root pnode? */ +#if 0 + /* don't do anything for / */ + if (inode->i_sb->s_root == file->f_dentry) + RETURN(0); +#endif + /* still opened by others? */ + if (--lli->lli_open_count) + RETURN(0); + fd = lli->lli_file_data; if (!fd) /* no process opened the file after an mcreate */ - RETURN(rc = 0); + RETURN(0); - rc2 = llu_mdc_close(&sbi->ll_mdc_conn, inode); + rc2 = llu_mdc_close(sbi->ll_mdc_exp, inode); if (rc2 && !rc) rc = rc2; @@ -480,58 +474,102 @@ static int llu_file_release(struct inode *inode) int llu_iop_close(struct inode *inode) { - return llu_file_release(inode); + int rc; + + rc = llu_file_release(inode); + if (!llu_i2info(inode)->lli_open_count) + llu_i2info(inode)->lli_stale_flag = 1; + return rc; } int llu_iop_ipreadv(struct inode *ino, - struct io_arguments *ioargs, - struct ioctx **ioctxp) + struct ioctx *ioctx) { - struct ioctx *ioctx; - - if (!ioargs->ioarg_iovlen) - return 0; - if (ioargs->ioarg_iovlen < 0) - return -EINVAL; + ENTRY; - ioctx = _sysio_ioctx_new(ino, ioargs); - if (!ioctx) - return -ENOMEM; + if (!ioctx->ioctx_iovlen) + RETURN(0); + if (ioctx->ioctx_iovlen < 0) + RETURN(-EINVAL); - ioctx->ioctx_cc = llu_file_read(ino, + ioctx->ioctx_private = llu_file_read(ino, ioctx->ioctx_iovec, ioctx->ioctx_iovlen, ioctx->ioctx_offset); - if (ioctx->ioctx_cc < 0) - ioctx->ioctx_errno = ioctx->ioctx_cc; + if (IS_ERR(ioctx->ioctx_private)) + return (PTR_ERR(ioctx->ioctx_private)); - *ioctxp = ioctx; - return 0; + RETURN(0); } int llu_iop_ipwritev(struct inode *ino, - struct io_arguments *ioargs, - struct ioctx **ioctxp) + struct ioctx *ioctx) { - struct ioctx *ioctx; - - if (!ioargs->ioarg_iovlen) - return 0; - if (ioargs->ioarg_iovlen < 0) - return -EINVAL; + ENTRY; - ioctx = _sysio_ioctx_new(ino, ioargs); - if (!ioctx) - return -ENOMEM; + if (!ioctx->ioctx_iovlen) + RETURN(0); + if (ioctx->ioctx_iovlen < 0) + RETURN(-EINVAL); - ioctx->ioctx_cc = llu_file_write(ino, + ioctx->ioctx_private = llu_file_write(ino, ioctx->ioctx_iovec, ioctx->ioctx_iovlen, ioctx->ioctx_offset); - if (ioctx->ioctx_cc < 0) - ioctx->ioctx_errno = ioctx->ioctx_cc; + if (IS_ERR(ioctx->ioctx_private)) + return (PTR_ERR(ioctx->ioctx_private)); - *ioctxp = ioctx; - return 0; + RETURN(0); +} + +/* this isn't where truncate starts. roughly: + * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate + * we grab the lock back in setattr_raw to avoid races. */ +static void llu_truncate(struct inode *inode) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct obdo oa = {0}; + int err; + ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%lu\n", lli->lli_st_ino, + lli->lli_st_generation); + + if (!lsm) { + CERROR("truncate on inode %lu with no objects\n", lli->lli_st_ino); + EXIT; + return; + } + + oa.o_id = lsm->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE|OBD_MD_FLMODE|OBD_MD_FLATIME| + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n", + oa.o_id, lli->lli_st_size); + + /* truncate == punch from new size to absolute end of file */ + err = obd_punch(llu_i2obdexp(inode), &oa, lsm, lli->lli_st_size, + OBD_OBJECT_EOF, NULL); + if (err) + CERROR("obd_truncate fails (%d) ino %lu\n", err, lli->lli_st_ino); + else + obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLATIME | OBD_MD_FLMTIME | + OBD_MD_FLCTIME); + + EXIT; + return; } +int llu_vmtruncate(struct inode * inode, loff_t offset) +{ + struct llu_inode_info *lli = llu_i2info(inode); + + lli->lli_st_size = offset; + + llu_truncate(inode); + + return 0; +} diff --git a/lustre/liblustre/genlib.sh b/lustre/liblustre/genlib.sh new file mode 100755 index 0000000..52b4b88 --- /dev/null +++ b/lustre/liblustre/genlib.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +# +# This script is to generate lib lustre library as a whole. It will leave +# two files on current directory: liblustre.a and liblustre.so. +# Integrate them into Makefile.am later +# + +AR=/usr/bin/ar +LD=/usr/bin/ld + +CWD=`pwd` + +SYSIO=$1 + +ALL_OBJS= + +build_obj_list() { + _objs=`$AR -t $1/$2` + for _lib in $_objs; do + ALL_OBJS=$ALL_OBJS"$1/$_lib "; + done; +} + +# lustre components libs +build_obj_list . libllite.a +build_obj_list ../lov liblov.a +build_obj_list ../obdecho libobdecho.a +build_obj_list ../osc libosc.a +build_obj_list ../mdc libmdc.a +build_obj_list ../ldlm libldlm.a +build_obj_list ../ptlrpc libptlrpc.a +build_obj_list ../obdclass liblustreclass.a +build_obj_list ../lvfs liblvfs.a + +# portals components libs +build_obj_list ../portals/utils libptlctl.a +build_obj_list ../portals/unals libtcpnal.a +build_obj_list ../portals/portals libportals.a + +# libsysio components libs +build_obj_list $SYSIO/drivers/native libsysio_native.a +build_obj_list $SYSIO/drivers/sockets libsysio_sockets.a +build_obj_list $SYSIO/src libsysio.a +build_obj_list $SYSIO/dev/stdfd libsysio_stdfd.a + + +# create static lib +rm -f $CWD/liblustre.a +$AR -r $CWD/liblustre.a $ALL_OBJS + +# create shared lib +rm -f $CWD/liblustre.so +$LD -shared -o $CWD/liblustre.so -init __liblustre_setup_ -fini __liblustre_cleanup_ \ + $ALL_OBJS -lpthread diff --git a/lustre/liblustre/llite_lib.c b/lustre/liblustre/llite_lib.c index e829bfa..e8b48c5 100644 --- a/lustre/liblustre/llite_lib.c +++ b/lustre/liblustre/llite_lib.c @@ -21,27 +21,25 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define DEBUG_SUBSYSTEM S_LLITE - #include #include -#include #include #include #include +#include +#include +#include + #include #include #include #include #include -#include -#include -#include - #include /* needed for ptpctl.h */ #include /* needed for parse_dump */ +#include #include "llite_lib.h" @@ -54,7 +52,13 @@ struct obd_class_user_state ocus; ptl_handle_ni_t * kportal_get_ni (int nal) { - return &tcpnal_ni; + switch (nal) + { + case SOCKNAL: + return &tcpnal_ni; + default: + return NULL; + } } inline void @@ -67,10 +71,6 @@ struct ldlm_namespace; struct ldlm_res_id; struct obd_import; -extern int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, int flags); -extern int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags); -extern int ldlm_replay_locks(struct obd_import *imp); - void *inter_module_get(char *arg) { if (!strcmp(arg, "tcpnal_ni")) @@ -85,6 +85,29 @@ void *inter_module_get(char *arg) return NULL; } +/* XXX move to proper place */ +char *portals_nid2str(int nal, ptl_nid_t nid, char *str) +{ + switch(nal){ + case TCPNAL: + /* userspace NAL */ + case SOCKNAL: + sprintf(str, "%u:%d.%d.%d.%d", (__u32)(nid >> 32), + HIPQUAD(nid)); + break; + case QSWNAL: + case GMNAL: + case IBNAL: + case TOENAL: + case SCIMACNAL: + sprintf(str, "%u:%u", (__u32)(nid >> 32), (__u32)nid); + break; + default: + return NULL; + } + return str; +} + void init_current(char *comm) { current = malloc(sizeof(*current)); @@ -95,15 +118,26 @@ void init_current(char *comm) current->pid = getpid(); current->fsuid = 0; current->fsgid = 0; - current->cap_effective = 0; + current->cap_effective = -1; memset(¤t->pending, 0, sizeof(current->pending)); } +/* FIXME */ +void generate_random_uuid(unsigned char uuid_out[16]) +{ + int *arr = (int*)uuid_out; + int i; + + for (i = 0; i < sizeof(uuid_out)/sizeof(int); i++) + arr[i] = rand(); +} + ptl_nid_t tcpnal_mynid; int init_lib_portals() { int rc; + ENTRY; PtlInit(); rc = PtlNIInit(procbridge_interface, 0, 0, 0, &tcpnal_ni); @@ -113,58 +147,36 @@ int init_lib_portals() RETURN (rc); } PtlNIDebug(tcpnal_ni, ~0); - return rc; + RETURN(rc); } -extern int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, unsigned long arg); +int +kportal_nal_cmd(struct portals_cfg *pcfg) +{ + /* handle portals command if we want */ + return 0; +} -struct mount_option_s mount_option = {NULL, NULL}; +extern int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd, unsigned long arg); -/* FIXME simple arg parser FIXME */ -void parse_mount_options(void *arg) +int lib_ioctl_nalcmd(int dev_id, int opc, void * ptr) { - char *buf = NULL; - struct obd_ioctl_data *data; - char *ptr, *comma, *eq, **tgt, *v; - int len; - - if (obd_ioctl_getdata(&buf, &len, arg)) { - CERROR("OBD ioctl: data error\n"); - return; - } - data = (struct obd_ioctl_data *)buf; - ptr = data->ioc_inlbuf1; - printf("mount option: %s\n", ptr); - - while (ptr) { - eq = strchr(ptr, '='); - if (!eq) - return; - - *eq = 0; - if (!strcmp("osc", ptr)) - tgt = &mount_option.osc_uuid; - else if (!strcmp("mdc", ptr)) - tgt = &mount_option.mdc_uuid; - else { - printf("Unknown mount option %s\n", ptr); - return; + struct portal_ioctl_data *ptldata; + + if (opc == IOC_PORTAL_NAL_CMD) { + ptldata = (struct portal_ioctl_data *) ptr; + + if (ptldata->ioc_nal_cmd == NAL_CMD_REGISTER_MYNID) { + tcpnal_mynid = ptldata->ioc_nid; + printf("mynid: %u.%u.%u.%u\n", + (unsigned)(tcpnal_mynid>>24) & 0xFF, + (unsigned)(tcpnal_mynid>>16) & 0xFF, + (unsigned)(tcpnal_mynid>>8) & 0xFF, + (unsigned)(tcpnal_mynid) & 0xFF); } - - v = eq + 1; - comma = strchr(v, ','); - if (comma) { - *comma = 0; - ptr = comma + 1; - } else - ptr = NULL; - - *tgt = malloc(strlen(v)+1); - strcpy(*tgt, v); } - if (buf) - obd_ioctl_freedata(buf, len); + return (0); } int lib_ioctl(int dev_id, int opc, void * ptr) @@ -174,18 +186,14 @@ int lib_ioctl(int dev_id, int opc, void * ptr) if (dev_id == OBD_DEV_ID) { struct obd_ioctl_data *ioc = ptr; - if (opc == OBD_IOC_MOUNTOPT) { - parse_mount_options(ptr); - return 0; - } + //XXX hack!!! + ioc->ioc_plen1 = ioc->ioc_inllen1; + ioc->ioc_pbuf1 = ioc->ioc_bulk; + //XXX - rc = class_handle_ioctl(&ocus, opc, (unsigned long)ptr); + rc = class_handle_ioctl(&ocus, opc, (unsigned long)ptr); - /* you _may_ need to call obd_ioctl_unpack or some - other verification function if you want to use ioc - directly here */ - printf ("processing ioctl cmd: %x buf len: %d, rc %d\n", - opc, ioc->ioc_len, rc); + printf ("proccssing ioctl cmd: %x, rc %d\n", opc, rc); if (rc) return rc; @@ -193,11 +201,22 @@ int lib_ioctl(int dev_id, int opc, void * ptr) return (0); } -int lllib_init(char *arg) +int lllib_init(char *dumpfile) { - tcpnal_mynid = ntohl(inet_addr(arg)); INIT_LIST_HEAD(&ocus.ocus_conns); + if (!g_zconf) { + /* this parse only get my nid from config file + * before initialize portals + */ + if (parse_dump(dumpfile, lib_ioctl_nalcmd)) + return -1; + } else { + /* XXX need setup mynid before tcpnal initialize */ + tcpnal_mynid = ((uint64_t)getpid() << 32) | time(0); + printf("set tcpnal mynid: %016llx\n", tcpnal_mynid); + } + init_current("dummy"); if (init_obdclass() || init_lib_portals() || @@ -208,19 +227,251 @@ int lllib_init(char *arg) osc_init()) return -1; - if (parse_dump("/tmp/DUMP_FILE", lib_ioctl)) + if (!g_zconf && parse_dump(dumpfile, lib_ioctl)) return -1; return _sysio_fssw_register("llite", &llu_fssw_ops); } + +#if 0 +static void llu_check_request() +{ + liblustre_check_events(0); + liblustre_check_services(); +} +#endif -/* FIXME */ -void generate_random_uuid(unsigned char uuid_out[16]) +int liblustre_process_log(struct config_llog_instance *cfg) { - int *arr = (int*)uuid_out; - int i; + struct lustre_cfg lcfg; + char *peer = "MDS_PEER_UUID"; + struct obd_device *obd; + struct lustre_handle mdc_conn = {0, }; + struct obd_export *exp; + char *name = "mdc_dev"; + class_uuid_t uuid; + struct obd_uuid mdc_uuid; + struct llog_ctxt *ctxt; + ptl_nid_t nid = 0; + int nal, err, rc = 0; + ENTRY; + + generate_random_uuid(uuid); + class_uuid_unparse(uuid, &mdc_uuid); + + if (ptl_parse_nid(&nid, g_zconf_mdsnid)) { + CERROR("Can't parse NID %s\n", g_zconf_mdsnid); + RETURN(-EINVAL); + } + nal = ptl_name2nal("tcp"); + if (nal <= 0) { + CERROR("Can't parse NAL tcp\n"); + RETURN(-EINVAL); + } + LCFG_INIT(lcfg, LCFG_ADD_UUID, NULL); + lcfg.lcfg_nid = nid; + lcfg.lcfg_inllen1 = strlen(peer) + 1; + lcfg.lcfg_inlbuf1 = peer; + lcfg.lcfg_nal = nal; + err = class_process_config(&lcfg); + if (err < 0) + GOTO(out, err); + + LCFG_INIT(lcfg, LCFG_ATTACH, name); + lcfg.lcfg_inlbuf1 = "mdc"; + lcfg.lcfg_inllen1 = strlen(lcfg.lcfg_inlbuf1) + 1; + lcfg.lcfg_inlbuf2 = mdc_uuid.uuid; + lcfg.lcfg_inllen2 = strlen(lcfg.lcfg_inlbuf2) + 1; + err = class_process_config(&lcfg); + if (err < 0) + GOTO(out_del_uuid, err); + + LCFG_INIT(lcfg, LCFG_SETUP, name); + lcfg.lcfg_inlbuf1 = g_zconf_mdsname; + lcfg.lcfg_inllen1 = strlen(lcfg.lcfg_inlbuf1) + 1; + lcfg.lcfg_inlbuf2 = peer; + lcfg.lcfg_inllen2 = strlen(lcfg.lcfg_inlbuf2) + 1; + err = class_process_config(&lcfg); + if (err < 0) + GOTO(out_detach, err); + + obd = class_name2obd(name); + if (obd == NULL) + GOTO(out_cleanup, err = -EINVAL); + + err = obd_connect(&mdc_conn, obd, &mdc_uuid); + if (err) { + CERROR("cannot connect to %s: rc = %d\n", + g_zconf_mdsname, err); + GOTO(out_cleanup, err); + } + + exp = class_conn2export(&mdc_conn); + + ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT]; + rc = class_config_parse_llog(ctxt, g_zconf_profile, cfg); + if (rc) { + CERROR("class_config_parse_llog failed: rc = %d\n", rc); + } - for (i = 0; i < sizeof(uuid_out)/sizeof(int); i++) - arr[i] = rand(); + err = obd_disconnect(exp, 0); + +out_cleanup: + LCFG_INIT(lcfg, LCFG_CLEANUP, name); + err = class_process_config(&lcfg); + if (err < 0) + GOTO(out, err); + +out_detach: + LCFG_INIT(lcfg, LCFG_DETACH, name); + err = class_process_config(&lcfg); + if (err < 0) + GOTO(out, err); + +out_del_uuid: + LCFG_INIT(lcfg, LCFG_DEL_UUID, name); + lcfg.lcfg_inllen1 = strlen(peer) + 1; + lcfg.lcfg_inlbuf1 = peer; + err = class_process_config(&lcfg); + +out: + if (rc == 0) + rc = err; + + RETURN(rc); +} + +static void sighandler_USR1(int signum) +{ + /* do nothing */ +} + +/* parse host:/mdsname/profile string */ +int ll_parse_mount_target(const char *target, char **mdsnid, + char **mdsname, char **profile) +{ + static char buf[256]; + char *s; + + buf[255] = 0; + strncpy(buf, target, 255); + + if ((s = strchr(buf, ':'))) { + *mdsnid = buf; + *s = '\0'; + + while (*++s == '/') + ; + *mdsname = s; + if ((s = strchr(*mdsname, '/'))) { + *s = '\0'; + *profile = s + 1; + return 0; + } + } + + return -1; +} + +/* env variables */ +#define ENV_LUSTRE_MNTPNT "LIBLUSTRE_MOUNT_POINT" +#define ENV_LUSTRE_MNTTGT "LIBLUSTRE_MOUNT_TARGET" +#define ENV_LUSTRE_DUMPFILE "LIBLUSTRE_DUMPFILE" + +extern int _sysio_native_init(); + +/* global variables */ +int g_zconf = 0; /* zeroconf or dumpfile */ +char *g_zconf_mdsname = NULL; /* mdsname, for zeroconf */ +char *g_zconf_mdsnid = NULL; /* mdsnid, for zeroconf */ +char *g_zconf_profile = NULL; /* profile, for zeroconf */ + + +void __liblustre_setup_(void) +{ + char *lustre_path = NULL; + char *target = NULL; + char *dumpfile = NULL; + char *root_driver = "native"; + char *lustre_driver = "llite"; + char *root_path = "/"; + unsigned mntflgs = 0; + + int err; + + srand(time(NULL)); + + signal(SIGUSR1, sighandler_USR1); + + lustre_path = getenv(ENV_LUSTRE_MNTPNT); + if (!lustre_path) { + lustre_path = "/mnt/lustre"; + } + + target = getenv(ENV_LUSTRE_MNTTGT); + if (!target) { + dumpfile = getenv(ENV_LUSTRE_DUMPFILE); + if (!dumpfile) { + CERROR("Neither mount target, nor dumpfile\n"); + exit(1); + } + g_zconf = 0; + printf("LibLustre: mount point %s, dumpfile %s\n", + lustre_path, dumpfile); + } else { + if (ll_parse_mount_target(target, + &g_zconf_mdsnid, + &g_zconf_mdsname, + &g_zconf_profile)) { + CERROR("mal-formed target %s \n", target); + exit(1); + } + g_zconf = 1; + printf("LibLustre: mount point %s, target %s\n", + lustre_path, target); + } + + if (_sysio_init() != 0) { + perror("init sysio"); + exit(1); + } + + /* cygwin don't need native driver */ +#ifndef __CYGWIN__ + _sysio_native_init(); +#endif + + err = _sysio_mount_root(root_path, root_driver, mntflgs, NULL); + if (err) { + perror(root_driver); + exit(1); + } + +#if 1 + portal_debug = 0; + portal_subsystem_debug = 0; +#endif + err = lllib_init(dumpfile); + if (err) { + perror("init llite driver"); + exit(1); + } + + err = mount("/", lustre_path, lustre_driver, mntflgs, NULL); + if (err) { + errno = -err; + perror(lustre_driver); + exit(1); + } + +#if 0 + __sysio_hook_sys_enter = llu_check_request; + __sysio_hook_sys_leave = NULL; +#endif } +void __liblustre_cleanup_(void) +{ + _sysio_shutdown(); + PtlFini(); +} diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 409f9e2..f258ec9 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -4,12 +4,14 @@ #include #include #include -#include +#include #include #include #include +#define PAGE_CACHE_MAXBYTES ((__u64)(~0UL) << PAGE_CACHE_SHIFT) + struct ll_file_data { struct obd_client_handle fd_mds_och; __u32 fd_flags; @@ -18,25 +20,45 @@ struct ll_file_data { struct llu_sb_info { struct obd_uuid ll_sb_uuid; - struct lustre_handle ll_mdc_conn; - struct obd_export ll_osc_exp; + struct obd_export *ll_mdc_exp; + struct obd_export *ll_osc_exp; obd_id ll_rootino; int ll_flags; struct list_head ll_conn_chain; + + struct obd_uuid ll_mds_uuid; + struct obd_uuid ll_mds_peer_uuid; + char *ll_instance; }; +#define LLI_F_HAVE_OST_SIZE_LOCK 0 +#define LLI_F_HAVE_MDS_SIZE_LOCK 1 +#define LLI_F_PREFER_EXTENDED_SIZE 2 + struct llu_inode_info { - struct llu_sb_info *lli_sbi; - struct ll_fid lli_fid; - struct lov_stripe_md *lli_smd; - char *lli_symlink_name; - /*struct semaphore lli_open_sem;*/ + struct llu_sb_info *lli_sbi; + struct ll_fid lli_fid; + + struct lov_stripe_md *lli_smd; + char *lli_symlink_name; + struct semaphore lli_open_sem; + __u64 lli_maxbytes; unsigned long lli_flags; - struct list_head lli_read_extents; - /* in libsysio we have no chance to store data in file, - * so place it here */ - struct ll_file_data *lli_file_data; + /* for libsysio */ + struct file_identifier lli_sysio_fid; + + struct lookup_intent *lli_it; + + /* XXX workaround for libsysio */ + int lli_stale_flag; + + /* in libsysio we have no chance to store data in file, + * so place it here. since it's possible that an file + * was opened several times without close, we track an + * open_count here */ + struct ll_file_data *lli_file_data; + int lli_open_count; /* stat FIXME not 64 bit clean */ dev_t lli_st_dev; @@ -58,6 +80,29 @@ struct llu_inode_info { unsigned long lli_st_generation; }; +#define LLU_SYSIO_COOKIE_SIZE(x) \ + (sizeof(struct llu_sysio_cookie) + \ + sizeof(struct ll_async_page) * (x) + \ + sizeof(struct page) * (x)) + +struct llu_sysio_cookie { + struct obd_sync_io_container lsc_osic; + struct inode *lsc_inode; + int lsc_npages; + struct ll_async_page *lsc_llap; + struct page *lsc_pages; + __u64 lsc_rwcount; +}; + +/* XXX why uio.h haven't the definition? */ +#define MAX_IOVEC 32 + +struct llu_sysio_callback_args +{ + int ncookies; + struct llu_sysio_cookie *cookies[MAX_IOVEC]; +}; + static inline struct llu_sb_info *llu_fs2sbi(struct filesys *fs) { return (struct llu_sb_info*)(fs->fs_private); @@ -73,6 +118,7 @@ static inline struct llu_sb_info *llu_i2sbi(struct inode *inode) return llu_i2info(inode)->lli_sbi; } +#if 0 static inline struct client_obd *sbi2mdc(struct llu_sb_info *sbi) { struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn); @@ -80,13 +126,110 @@ static inline struct client_obd *sbi2mdc(struct llu_sb_info *sbi) LBUG(); return &obd->u.cli; } +#endif + +static inline struct obd_export *llu_i2obdexp(struct inode *inode) +{ + return llu_i2info(inode)->lli_sbi->ll_osc_exp; +} -static inline struct lustre_handle *llu_i2obdconn(struct inode *inode) +static inline struct obd_export *llu_i2mdcexp(struct inode *inode) { - return &(llu_i2info(inode)->lli_sbi->ll_osc_conn); + return llu_i2info(inode)->lli_sbi->ll_mdc_exp; } +#define LL_SAVE_INTENT(inode, it) \ +do { \ + struct lookup_intent *temp; \ + LASSERT(llu_i2info(inode)->lli_it == NULL); \ + OBD_ALLOC(temp, sizeof(*temp)); \ + memcpy(temp, it, sizeof(*temp)); \ + llu_i2info(inode)->lli_it = temp; \ + CDEBUG(D_DENTRY, "alloc intent %p to inode %p(ino %lu)\n", \ + temp, inode, llu_i2info(inode)->lli_st_ino); \ +} while(0) + + +#define LL_GET_INTENT(inode, it) \ +do { \ + it = llu_i2info(inode)->lli_it; \ + \ + LASSERT(it); \ + llu_i2info(inode)->lli_it = NULL; \ + CDEBUG(D_DENTRY, "dettach intent %p from inode %p(ino %lu)\n", \ + it, inode, llu_i2info(inode)->lli_st_ino); \ +} while(0) + +/* interpet return codes from intent lookup */ +#define LL_LOOKUP_POSITIVE 1 +#define LL_LOOKUP_NEGATIVE 2 + +static inline void ll_inode2fid(struct ll_fid *fid, struct inode *inode) +{ + *fid = llu_i2info(inode)->lli_fid; +} + +struct it_cb_data { + struct inode *icbd_parent; + struct pnode *icbd_child; + obd_id hash; +}; + +static inline void ll_i2uctxt(struct ll_uctxt *ctxt, struct inode *i1, + struct inode *i2) +{ + struct llu_inode_info *lli1 = llu_i2info(i1); + struct llu_inode_info *lli2; + + LASSERT(i1); + LASSERT(ctxt); + + if (in_group_p(lli1->lli_st_gid)) + ctxt->gid1 = lli1->lli_st_gid; + else + ctxt->gid1 = -1; + + if (i2) { + lli2 = llu_i2info(i2); + if (in_group_p(lli2->lli_st_gid)) + ctxt->gid2 = lli2->lli_st_gid; + else + ctxt->gid2 = -1; + } else + ctxt->gid2 = 0; +} + + +typedef int (*intent_finish_cb)(struct ptlrpc_request *, + struct inode *parent, struct pnode *pnode, + struct lookup_intent *, int offset, obd_id ino); +int llu_intent_lock(struct inode *parent, struct pnode *pnode, + struct lookup_intent *, int flags, intent_finish_cb); + +/* FIXME */ +static inline int ll_permission(struct inode *inode, int flag, void * unused) +{ + return 0; +} + +#if 0 +static inline int it_disposition(struct lookup_intent *it, int flag) +{ + return it->d.lustre.it_disposition & flag; +} + +static inline void it_set_disposition(struct lookup_intent *it, int flag) +{ + it->d.lustre.it_disposition |= flag; +} +#endif + +static inline __u64 ll_file_maxbytes(struct inode *inode) +{ + return llu_i2info(inode)->lli_maxbytes; +} + struct mount_option_s { char *mdc_uuid; @@ -95,7 +238,14 @@ struct mount_option_s /* llite_lib.c */ void generate_random_uuid(unsigned char uuid_out[16]); +int liblustre_process_log(struct config_llog_instance *cfg); +int ll_parse_mount_target(const char *target, char **mdsnid, + char **mdsname, char **profile); +extern int g_zconf; +extern char *g_zconf_mdsnid; +extern char *g_zconf_mdsname; +extern char *g_zconf_profile; extern struct mount_option_s mount_option; /* super.c */ @@ -103,7 +253,11 @@ void llu_update_inode(struct inode *inode, struct mds_body *body, struct lov_stripe_md *lmm); void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid); void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid); -struct inode* llu_new_inode(struct filesys *fs, ino_t ino, mode_t mode); +//struct inode* llu_new_inode(struct filesys *fs, ino_t ino, mode_t mode); +//int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm, void *ostdata); +int ll_it_open_error(int phase, struct lookup_intent *it); +struct inode *llu_iget(struct filesys *fs, struct lustre_md *md); +int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm); extern struct fssw_ops llu_fssw_ops; @@ -116,19 +270,43 @@ void llu_prepare_mdc_op_data(struct mdc_op_data *data, int mode); int llu_create(struct inode *dir, struct pnode_base *pnode, int mode); int llu_iop_open(struct pnode *pnode, int flags, mode_t mode); +int llu_mdc_close(struct obd_export *mdc_exp, struct inode *inode); int llu_iop_close(struct inode *inode); -int llu_iop_ipreadv(struct inode *ino, - struct io_arguments *ioargs, - struct ioctx **ioctxp); -int llu_iop_ipwritev(struct inode *ino, - struct io_arguments *ioargs, - struct ioctx **ioctxp); +int llu_iop_ipreadv(struct inode *ino, struct ioctx *ioctxp); +int llu_iop_ipwritev(struct inode *ino, struct ioctx *ioctxp); +int llu_vmtruncate(struct inode * inode, loff_t offset); +void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid); +int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir); /* rw.c */ int llu_iop_iodone(struct ioctx *ioctxp __IS_UNUSED); -ssize_t llu_file_write(struct inode *inode, const struct iovec *iovec, +struct llu_sysio_callback_args* +llu_file_write(struct inode *inode, const struct iovec *iovec, size_t iovlen, loff_t pos); -ssize_t llu_file_read(struct inode *inode, const struct iovec *iovec, +struct llu_sysio_callback_args* +llu_file_read(struct inode *inode, const struct iovec *iovec, size_t iovlen, loff_t pos); +int llu_extent_lock_no_validate(struct ll_file_data *fd, + struct inode *inode, + struct lov_stripe_md *lsm, + int mode, + struct ldlm_extent *extent, + struct lustre_handle *lockh, + int ast_flags); +int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, + int mode, struct ldlm_extent *extent, + struct lustre_handle *lockh); +int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct lustre_handle *lockh); + +/* namei.c */ +int llu_iop_lookup(struct pnode *pnode, + struct inode **inop, + struct intent *intnt, + const char *path); +void unhook_stale_inode(struct pnode *pno); +struct inode *llu_inode_from_lock(struct ldlm_lock *lock); #endif diff --git a/lustre/liblustre/lltest.c b/lustre/liblustre/lltest.c index acdc47e..ac6e8ad 100644 --- a/lustre/liblustre/lltest.c +++ b/lustre/liblustre/lltest.c @@ -27,132 +27,359 @@ #include #include #include -#include #include #include #include #include #include -#include +#include #include #include +#include "test_common.h" -int do_stat(const char *name) +#define ENTRY(str) \ + do { \ + char buf[100]; \ + int len; \ + sprintf(buf, "===== START: %s ", (str)); \ + len = strlen(buf); \ + if (len < 79) { \ + memset(buf+len, '=', 100-len); \ + buf[79] = '\n'; \ + buf[80] = 0; \ + } \ + printf("%s", buf); \ + } while (0) + +#define LEAVE() \ + do { \ + printf("----- END TEST successfully ---"); \ + printf("-----------------------------"); \ + printf("-------------------\n"); \ + } while (0) + +void t1() { - struct stat stat; + char *path="/mnt/lustre/test_t1"; + ENTRY("create/delete"); - if (lstat(name, &stat)) { - perror("failed to stat: "); - return -1; - } - printf("******* stat '%s' ********\n", name); - printf("ino:\t\t%lu\n",stat.st_ino); - printf("mode:\t\t%o\n",stat.st_mode); - printf("nlink:\t\t%d\n",stat.st_nlink); - printf("uid/gid:\t%d/%d\n", stat.st_uid, stat.st_gid); - printf("size:\t\t%ld\n", stat.st_size); - printf("blksize:\t%ld\n", stat.st_blksize); - printf("block count:\t%ld\n", stat.st_blocks); - printf("atime:\t\t%lu\n",stat.st_atime); - printf("mtime:\t\t%lu\n",stat.st_mtime); - printf("ctime:\t\t%lu\n",stat.st_ctime); - printf("******* end stat ********\n"); + t_touch(path); + t_unlink(path); + LEAVE(); +} - return 0; +void t2() +{ + char *path="/mnt/lustre/test_t2"; + ENTRY("mkdir/rmdir"); + + t_mkdir(path); + t_rmdir(path); + LEAVE(); } -/* - * Get stats of file and file system. - * - * Usage: test_stats [-a] [-r ] [-m ] [ ...] - */ -extern int lllib_init(char *arg); +void t3() +{ + char *path="/mnt/lustre/test_t3"; + ENTRY("regular stat"); -char *root_driver = "llite"; -char *root_path = "/"; -unsigned mntflgs = 0; -struct mount root_mount; + t_touch(path); + t_check_stat(path, NULL); + t_unlink(path); + LEAVE(); +} + +void t4() +{ + char *path="/mnt/lustre/test_t4"; + ENTRY("dir stat"); + + t_mkdir(path); + t_check_stat(path, NULL); + t_rmdir(path); + LEAVE(); +} -extern int portal_debug; -extern int portal_subsystem_debug; +#define PAGE_SIZE (4096) +#define _npages (512) -char* files[] = {"/dir1", "/dir1/file1", "/dir1/file2", "/dir1/dir2", "/dir1/dir2/file3"}; +static int _buffer[_npages][PAGE_SIZE/sizeof(int)]; -int -main(int argc, char * const argv[]) +/* pos: i/o start from + * xfer: npages per transfer + */ +static void pages_io(int xfer, loff_t pos) { - struct stat statbuf; - int rc, err, i, fd, written, readed; - char pgbuf[4096], readbuf[4096]; - int npages; - - if (_sysio_init() != 0) { - perror("init sysio"); - exit(1); - } - err = lllib_init(argv[1]); - if (err) { - perror("init llite driver"); - exit(1); - } - - err = _sysio_mount_root(root_path, root_driver, mntflgs, NULL); - if (err) { - errno = -err; - perror(root_driver); - exit(1); - } -#if 0 - for (i=0; i< sizeof(files)/sizeof(char*); i++) { - printf("******** stat %s *********\n", files[i]); - /* XXX ugly, only for testing */ - err = fixme_lstat(files[i], &statbuf); - if (err) - perror(root_driver); - printf("******** end stat %s: %d*********\n", files[i], err); - } -#endif -#if 0 - portal_debug = 0; - portal_subsystem_debug = 0; - npages = 10; - - fd = open("/newfile01", O_RDWR|O_CREAT|O_TRUNC, 00664); - printf("***************** open return %d ****************\n", fd); - - printf("***************** begin write pages ****************\n"); - for (i = 0; i < npages; i++ ) { - memset(pgbuf, ('A'+ i%10), 4096); - written = write(fd, pgbuf, 4096); - printf(">>> page %d: %d bytes written\n", i, written); + char *path="/mnt/lustre/test_t5"; + int check_sum[_npages] = {0,}; + int fd, rc, i, j; + + memset(_buffer, 0, sizeof(_buffer)); + + /* create sample data */ + for (i = 0; i < _npages; i++) { + for (j = 0; j < PAGE_SIZE/sizeof(int); j++) { + _buffer[i][j] = rand(); + } + } + + /* compute checksum */ + for (i = 0; i < _npages; i++) { + for (j = 0; j < PAGE_SIZE/sizeof(int); j++) { + check_sum[i] += _buffer[i][j]; + } + } + + t_touch(path); + + fd = t_open(path); + + /* write */ + lseek(fd, pos, SEEK_SET); + for (i = 0; i < _npages; i += xfer) { + rc = write(fd, _buffer[i], PAGE_SIZE * xfer); + if (rc != PAGE_SIZE * xfer) { + printf("write error %d (i = %d)\n", rc, i); + exit(1); + } } + printf("succefully write %d pages\n", _npages); - printf("***************** begin read pages ****************\n"); - lseek(fd, 0, SEEK_SET); + memset(_buffer, 0, sizeof(_buffer)); - for (i = 0; i < npages; i++ ) { - memset(readbuf, '8', 4096); - readed = read(fd, readbuf, 4096); - readbuf[10] = 0; - printf("<<< page %d: %d bytes (%s)\n", i, readed, readbuf); + /* read */ + lseek(fd, pos, SEEK_SET); + for (i = 0; i < _npages; i += xfer) { + rc = read(fd, _buffer[i], PAGE_SIZE * xfer); + if (rc != PAGE_SIZE * xfer) { + printf("read error %d (i = %d)\n", rc, i); + exit(1); + } } - close(fd); -#endif + printf("succefully read %d pages\n", _npages); + + /* compute checksum */ + for (i = 0; i < _npages; i++) { + int sum = 0; + for (j = 0; j < PAGE_SIZE/sizeof(int); j++) { + sum += _buffer[i][j]; + } + if (sum != check_sum[i]) { + printf("chunk %d checksum error: expected 0x%x, get 0x%x\n", + i, check_sum[i], sum); + } + } + printf("checksum verified OK!\n"); + + t_close(fd); + t_unlink(path); +} + +void t5() +{ + char text[256]; + loff_t off_array[] = {1, 17, 255, 257, 4095, 4097, 8191, 1024*1024*1024}; + int np = 1, i; + loff_t offset = 0; + + while (np <= _npages) { + sprintf(text, "pages_io: %d per transfer, offset %lld", + np, offset); + ENTRY(text); + pages_io(np, offset); + LEAVE(); + np += np; + } + + for (i = 0; i < sizeof(off_array)/sizeof(loff_t); i++) { + offset = off_array[i]; + sprintf(text, "pages_io: 16 per transfer, offset %lld", + offset); + ENTRY(text); + pages_io(16, offset); + LEAVE(); + } +} + +void t6() +{ + char *path="/mnt/lustre/test_t6"; + char *path2="/mnt/lustre/test_t6_link"; + ENTRY("symlink"); + + t_touch(path); + t_symlink(path, path2); + t_check_stat(path2, NULL); + t_unlink(path2); + t_unlink(path); + LEAVE(); +} + +void t7() +{ + char *path="/mnt/lustre/test_t7"; + ENTRY("mknod"); + + t_mknod(path, S_IFCHR | 0644, 5, 4); + t_check_stat(path, NULL); + t_unlink(path); + LEAVE(); +} + +void t8() +{ + char *path="/mnt/lustre/test_t8"; + ENTRY("chmod"); + + t_touch(path); + t_chmod_raw(path, 0700); + t_check_stat(path, NULL); + t_unlink(path); + LEAVE(); +} + +void t9() +{ + char *path="/mnt/lustre/test_t9"; + char *path2="/mnt/lustre/test_t9_link"; + ENTRY("hard link"); + + t_touch(path); + t_link(path, path2); + t_check_stat(path, NULL); + t_check_stat(path2, NULL); + t_unlink(path); + t_unlink(path2); + LEAVE(); +} + +void t10() +{ + char *dir1="/mnt/lustre/test_t10_dir1"; + char *dir2="/mnt/lustre/test_t10_dir2"; + char *path1="/mnt/lustre/test_t10_reg1"; + char *path2="/mnt/lustre/test_t10_reg2"; + char *rename1="/mnt/lustre/test_t10_dir1/rename1"; + char *rename2="/mnt/lustre/test_t10_dir2/rename2"; + char *rename3="/mnt/lustre/test_t10_dir2/rename3"; + ENTRY("rename"); + + t_mkdir(dir1); + t_mkdir(dir2); + t_touch(path1); + t_touch(path2); + t_rename(path1, rename1); + t_rename(path2, rename2); + t_rename(rename1, rename2); + t_rename(dir1, rename3); + t_unlink(rename2); + t_rmdir(rename3); + t_rmdir(dir2); + LEAVE(); +} + +void t100() +{ + char *base="/mnt/lustre"; + char path[4096], path2[4096]; + int i, j, level = 5, nreg = 5; + ENTRY("deep tree"); + + strcpy(path, base); + + for (i = 0; i < level; i++) { + for (j = 0; j < nreg; j++) { + sprintf(path2, "%s/file%d", path, j); + t_touch(path2); + } + + strcat(path, "/dir"); + t_mkdir(path); + } + + for (i = level; i > 0; i--) { + strcpy(path, base); + for (j = 1; j < i; j++) + strcat(path, "/dir"); + + for (j = 0; j < nreg; j++) { + sprintf(path2, "%s/file%d", path, j); + t_unlink(path2); + } + + strcat(path, "/dir"); + t_rmdir(path); + } + + LEAVE(); +} + +extern void __liblustre_setup_(void); +extern void __liblustre_cleanup_(void); + +void usage(char *cmd) +{ + printf("Usage: \t%s --target mdsnid:/mdsname/profile\n", cmd); + printf(" \t%s --dumpfile dumpfile\n", cmd); + exit(-1); +} + +int main(int argc, char * const argv[]) +{ + int opt_index, c; + static struct option long_opts[] = { + {"target", 1, 0, 0}, + {"dumpfile", 1, 0, 0}, + {0, 0, 0, 0} + }; + + if (argc <= 1) + usage(argv[0]); + + while ((c = getopt_long(argc, argv, "", long_opts, &opt_index)) != -1) { + switch (c) { + case 0: { + printf("optindex %d\n", opt_index); + if (!optarg[0]) + usage(argv[0]); + + if (!strcmp(long_opts[opt_index].name, "target")) { + setenv(ENV_LUSTRE_MNTTGT, optarg, 1); + } else if (!strcmp(long_opts[opt_index].name, "dumpfile")) { + setenv(ENV_LUSTRE_DUMPFILE, optarg, 1); + } else + usage(argv[0]); + break; + } + default: + usage(argv[0]); + } + } + + if (optind != argc) + usage(argv[0]); + + __liblustre_setup_(); + +#ifndef __CYGWIN__ + t1(); + t2(); + t3(); + t4(); + t5(); + t6(); + t7(); + t8(); + t9(); + t10(); -#if 1 - //rc = chown("/newfile01", 10, 20); - rc = chmod("/newfile01", 0777); - printf("-------------- chmod return %d -----------\n", rc); - do_stat("/newfile01"); + t100(); #endif - printf("sysio is about shutdown\n"); - /* - * Clean up. - */ - _sysio_shutdown(); + printf("liblustre is about shutdown\n"); + __liblustre_cleanup_(); printf("complete successfully\n"); return 0; diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c new file mode 100644 index 0000000..1dceb8b --- /dev/null +++ b/lustre/liblustre/namei.c @@ -0,0 +1,634 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Light Super operations + * + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "llite_lib.h" + +static void ll_intent_release(struct lookup_intent *it) +{ + struct lustre_handle *handle; + ENTRY; + + /* LASSERT(ll_d2d(de) != NULL); */ + + if (it->d.lustre.it_lock_mode) { + handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle; + CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64 + " from it %p\n", + handle->cookie, it); + ldlm_lock_decref(handle, it->d.lustre.it_lock_mode); + + /* intent_release may be called multiple times, from + this thread and we don't want to double-decref this + lock (see bug 494) */ + it->d.lustre.it_lock_mode = 0; + } + it->it_magic = 0; + it->it_op_release = 0; + EXIT; +} + +#if 0 +static void llu_mdc_lock_set_inode(struct lustre_handle *lockh, + struct inode *inode) +{ + struct ldlm_lock *lock = ldlm_handle2lock(lockh); + ENTRY; + + LASSERT(lock != NULL); + lock->l_data = inode; + LDLM_LOCK_PUT(lock); + EXIT; +} + +static int pnode_revalidate_finish(struct ptlrpc_request *request, + struct inode *parent, struct pnode *pnode, + struct lookup_intent *it, int offset, + obd_id ino) +{ + struct llu_sb_info *sbi = llu_i2sbi(parent); + struct pnode_base *pb = pnode->p_base; + struct mds_body *body; + struct lov_stripe_md *lsm = NULL; + struct lov_mds_md *lmm; + int lmmsize; + int rc = 0; + ENTRY; + + /* NB 1 request reference will be taken away by ll_intent_lock() + * when I return */ + + if (it_disposition(it, DISP_LOOKUP_NEG)) + RETURN(-ENOENT); + + /* We only get called if the mdc_enqueue() called from + * ll_intent_lock() was successful. Therefore the mds_body is + * present and correct, and the eadata is present (but still + * opaque, so only obd_unpackmd() can check the size) */ + body = lustre_msg_buf(request->rq_repmsg, offset, sizeof (*body)); + LASSERT (body != NULL); + LASSERT_REPSWABBED (request, offset); + + if (body->valid & OBD_MD_FLEASIZE) { + /* Only bother with this if inodes's LSM not set? */ + + if (body->eadatasize == 0) { + CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n"); + GOTO (out, rc = -EPROTO); + } + lmmsize = body->eadatasize; + lmm = lustre_msg_buf (request->rq_repmsg, offset + 1, lmmsize); + LASSERT (lmm != NULL); + LASSERT_REPSWABBED (request, offset + 1); + + rc = obd_unpackmd (&sbi->ll_osc_conn, + &lsm, lmm, lmmsize); + if (rc < 0) { + CERROR ("Error %d unpacking eadata\n", rc); + LBUG(); + /* XXX don't know if I should do this... */ + GOTO (out, rc); + /* or skip the ll_update_inode but still do + * mdc_lock_set_inode() */ + } + LASSERT (rc >= sizeof (*lsm)); + rc = 0; + } + + llu_update_inode(pb->pb_ino, body, lsm); + + if (lsm != NULL && + llu_i2info(pb->pb_ino)->lli_smd != lsm) + obd_free_memmd (&sbi->ll_osc_conn, &lsm); + + llu_mdc_lock_set_inode((struct lustre_handle *)&it->d.lustre.it_lock_handle, + pb->pb_ino); + out: + RETURN(rc); +} +#endif + +/* + * remove the stale inode from pnode + */ +void unhook_stale_inode(struct pnode *pno) +{ + struct inode *inode = pno->p_base->pb_ino; + ENTRY; + + LASSERT(inode); + LASSERT(llu_i2info(inode)->lli_stale_flag); + + pno->p_base->pb_ino = NULL; + + if (!llu_i2info(inode)->lli_open_count) { + CDEBUG(D_INODE, "unhook inode %p (ino %lu) from pno %p\n", + inode, llu_i2info(inode)->lli_st_ino, pno); + I_RELE(inode); + if (!inode->i_ref) + _sysio_i_gone(inode); + } + + EXIT; + return; +} + +void llu_lookup_finish_locks(struct lookup_intent *it, struct pnode *pnode) +{ + LASSERT(it); + LASSERT(pnode); + + if (it && pnode->p_base->pb_ino != NULL) { + struct inode *inode = pnode->p_base->pb_ino; + CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%lu)\n", + inode, llu_i2info(inode)->lli_st_ino, + llu_i2info(inode)->lli_st_generation); + mdc_set_lock_data(&it->d.lustre.it_lock_handle, inode); + } + + /* drop IT_LOOKUP locks */ + if (it->it_op == IT_LOOKUP) + ll_intent_release(it); + +} + +static inline void ll_invalidate_inode_pages(struct inode * inode) +{ + /* do nothing */ +} + +static int llu_mdc_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, int flag) +{ + int rc; + struct lustre_handle lockh; + ENTRY; + + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc); + RETURN(rc); + } + break; + case LDLM_CB_CANCELING: { + struct inode *inode = llu_inode_from_lock(lock); + struct llu_inode_info *lli; + + /* Invalidate all dentries associated with this inode */ + if (inode == NULL) + break; + + lli = llu_i2info(inode); + + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + + if (lock->l_resource->lr_name.name[0] != lli->lli_st_ino || + lock->l_resource->lr_name.name[1] != lli->lli_st_generation) { + LDLM_ERROR(lock, "data mismatch with ino %lu/%lu", + lli->lli_st_ino, lli->lli_st_generation); + } + if (S_ISDIR(lli->lli_st_mode)) { + CDEBUG(D_INODE, "invalidating inode %lu\n", + lli->lli_st_ino); + + ll_invalidate_inode_pages(inode); + } + +/* + if (inode->i_sb->s_root && + inode != inode->i_sb->s_root->d_inode) + ll_unhash_aliases(inode); +*/ + I_RELE(inode); + break; + } + default: + LBUG(); + } + + RETURN(0); +} + +int llu_pb_revalidate(struct pnode *pnode, int flags, struct lookup_intent *it) +{ + struct pnode_base *pb = pnode->p_base; + struct ll_fid pfid, cfid; + struct it_cb_data icbd; + struct ll_uctxt ctxt; + struct ptlrpc_request *req = NULL; + struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; + struct obd_export *exp; + int rc; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%x\n", + pb->pb_name.name, it ? it->it_op : 0); + + /* We don't want to cache negative dentries, so return 0 immediately. + * We believe that this is safe, that negative dentries cannot be + * pinned by someone else */ + if (pb->pb_ino == NULL) { + CDEBUG(D_INODE, "negative pb\n"); + RETURN(0); + } + + /* check stale inode */ + if (llu_i2info(pb->pb_ino)->lli_stale_flag) + unhook_stale_inode(pnode); + + /* check again because unhook_stale_inode() might generate + * negative pnode */ + if (pb->pb_ino == NULL) { + CDEBUG(D_INODE, "negative pb\n"); + RETURN(0); + } + + /* This is due to bad interaction with libsysio. remove this when we + * switched to libbsdio + */ + { + struct llu_inode_info *lli = llu_i2info(pb->pb_ino); + if (lli->lli_it) { + CDEBUG(D_INODE, "inode %lu still have intent " + "%p(opc 0x%x), release it\n", + lli->lli_st_ino, lli->lli_it, + lli->lli_it->it_op); + ll_intent_release(lli->lli_it); + OBD_FREE(lli->lli_it, sizeof(*lli->lli_it)); + lli->lli_it = NULL; + } + } + + exp = llu_i2mdcexp(pb->pb_ino); + ll_inode2fid(&pfid, pnode->p_parent->p_base->pb_ino); + ll_inode2fid(&cfid, pb->pb_ino); + icbd.icbd_parent = pnode->p_parent->p_base->pb_ino; + icbd.icbd_child = pnode; + + if (!it) { + it = &lookup_it; + it->it_op_release = ll_intent_release; + } + + ll_i2uctxt(&ctxt, pnode->p_parent->p_base->pb_ino, pb->pb_ino); + + rc = mdc_intent_lock(exp, &ctxt, &pfid, + pb->pb_name.name, pb->pb_name.len, + NULL, 0, &cfid, it, flags, &req, + llu_mdc_blocking_ast); + /* If req is NULL, then mdc_intent_lock only tried to do a lock match; + * if all was well, it will return 1 if it found locks, 0 otherwise. */ + if (req == NULL && rc >= 0) + GOTO(out, rc); + + /* unfortunately ll_intent_lock may cause a callback and revoke our + dentry */ + /* + spin_lock(&dcache_lock); + list_del_init(&de->d_hash); + spin_unlock(&dcache_lock); + d_rehash(de); + */ + if (it->it_op & (IT_OPEN | IT_GETATTR)) + LL_SAVE_INTENT(pb->pb_ino, it); + RETURN(1); + out: + if (req) + ptlrpc_req_finished(req); + if (rc == 0) { + LASSERT(pb->pb_ino); + if (S_ISDIR(llu_i2info(pb->pb_ino)->lli_st_mode)) + ll_invalidate_inode_pages(pb->pb_ino); + llu_i2info(pb->pb_ino)->lli_stale_flag = 1; + unhook_stale_inode(pnode); + } else { + llu_lookup_finish_locks(it, pnode); + llu_i2info(pb->pb_ino)->lli_stale_flag = 0; + if (it->it_op & (IT_OPEN | IT_GETATTR)) + LL_SAVE_INTENT(pb->pb_ino, it); + } + RETURN(rc); +} + +static int lookup_it_finish(struct ptlrpc_request *request, int offset, + struct lookup_intent *it, void *data) +{ + struct it_cb_data *icbd = data; + struct pnode *child = icbd->icbd_child; + struct inode *parent = icbd->icbd_parent; + struct llu_sb_info *sbi = llu_i2sbi(parent); + struct inode *inode = NULL; + int rc; + + /* NB 1 request reference will be taken away by ll_intent_lock() + * when I return */ + /* XXX libsysio require the inode must be generated here XXX */ + if ((it->it_op & IT_CREAT) || !it_disposition(it, DISP_LOOKUP_NEG)) { + struct lustre_md md; + struct llu_inode_info *lli; + ENTRY; + + rc = mdc_req2lustre_md(request, offset, sbi->ll_osc_exp, &md); + if (rc) + RETURN(rc); + + inode = llu_iget(parent->i_fs, &md); + if (!inode) { + /* free the lsm if we allocated one above */ + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_osc_exp, &md.lsm); + RETURN(-ENOMEM); + } else if (md.lsm != NULL && + llu_i2info(inode)->lli_smd != md.lsm) { + obd_free_memmd(sbi->ll_osc_exp, &md.lsm); + } + + lli = llu_i2info(inode); + + /* If this is a stat, get the authoritative file size */ + if (it->it_op == IT_GETATTR && S_ISREG(lli->lli_st_mode) && + lli->lli_smd != NULL) { + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + struct lustre_handle lockh = {0}; + struct lov_stripe_md *lsm = lli->lli_smd; + ldlm_error_t rc; + + LASSERT(lsm->lsm_object_id != 0); + + rc = llu_extent_lock(NULL, inode, lsm, LCK_PR, &extent, + &lockh); + if (rc != ELDLM_OK) { + I_RELE(inode); + RETURN(-EIO); + } + llu_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); + } + } else { + ENTRY; + } + + if (inode && (it->it_op & (IT_OPEN | IT_GETATTR))) + LL_SAVE_INTENT(inode, it); +/* + dentry->d_op = &ll_d_ops; + ll_set_dd(dentry); + + if (dentry == saved) + d_add(dentry, inode); +*/ + child->p_base->pb_ino = inode; + + RETURN(0); +} + +struct inode *llu_inode_from_lock(struct ldlm_lock *lock) +{ + struct inode *inode; + l_lock(&lock->l_resource->lr_namespace->ns_lock); + + if (lock->l_ast_data) { + inode = (struct inode *)lock->l_ast_data; + I_REF(inode); + } else + inode = NULL; + + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + return inode; +} + +/* XXX */ +#define EXT2_NAME_LEN (255) + +static int llu_lookup_it(struct inode *parent, struct pnode *pnode, + struct lookup_intent *it, int flags) +{ + struct ll_fid pfid; + struct ll_uctxt ctxt; + struct it_cb_data icbd; + struct ptlrpc_request *req = NULL; + struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; + int rc; + ENTRY; + + if (pnode->p_base->pb_name.len > EXT2_NAME_LEN) + RETURN(-ENAMETOOLONG); + + +/* + CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n", + dentry->d_name.name, parent->i_ino, parent->i_generation, + parent, LL_IT2STR(it)); + + if (d_mountpoint(dentry)) + CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it)); + + ll_frob_intent(&it, &lookup_it); +*/ + + if (!it) { + it = &lookup_it; + it->it_op_release = ll_intent_release; + } + + icbd.icbd_child = pnode; + icbd.icbd_parent = parent; + icbd.icbd_child = pnode; + ll_inode2fid(&pfid, parent); + ll_i2uctxt(&ctxt, parent, NULL); + + rc = mdc_intent_lock(llu_i2mdcexp(parent), &ctxt, &pfid, + pnode->p_base->pb_name.name, + pnode->p_base->pb_name.len, + NULL, 0, NULL, it, flags, &req, + llu_mdc_blocking_ast); + if (rc < 0) + GOTO(out, rc); + + rc = lookup_it_finish(req, 1, it, &icbd); + if (rc != 0) { + ll_intent_release(it); + GOTO(out, rc); + } + + llu_lookup_finish_locks(it, pnode); + +/* + if (dentry == save) + GOTO(out, retval = NULL); + else + GOTO(out, retval = dentry); +*/ + out: + if (req) + ptlrpc_req_finished(req); + return rc; +} + +static struct lookup_intent* +translate_lookup_intent(struct intent *intent, const char *path) +{ + struct lookup_intent *it; + int fmode; + + /* libsysio trick */ + if (!intent || path) { + CDEBUG(D_VFSTRACE, "not intent needed\n"); + return NULL; + } + + OBD_ALLOC(it, sizeof(*it)); + LASSERT(it); + + memset(it, 0, sizeof(*it)); + + /* libsysio will assign intent like following: + * NOTE: INT_CREAT has include INT_UPDPARENT + * + * open: INT_OPEN [| INT_CREAT] + * mkdir: INT_CREAT + * symlink: INT_CREAT + * unlink: INT_UPDPARENT + * rmdir: INT_UPDPARENT + * mknod: INT_CREAT + * stat: INT_GETATTR + * setattr: NULL + * + * following logic is adjusted for libsysio + */ + + it->it_flags = intent->int_arg2 ? *((int*)intent->int_arg2) : 0; + + if (intent->int_opmask & INT_OPEN) { + it->it_op |= IT_OPEN; + + /* convert access mode from O_ to FMODE_ */ + if (it->it_flags & O_WRONLY) + fmode = FMODE_WRITE; + else if (it->it_flags & O_RDWR) + fmode = FMODE_READ | FMODE_WRITE; + else + fmode = FMODE_READ; + it->it_flags &= ~O_ACCMODE; + it->it_flags |= fmode; + } + + /* + else if (intent->int_opmask & INT_CREAT) + it->it_op |= IT_LOOKUP; + */ + + /* FIXME libsysio has strange code on intent handling, + * more check later */ + if (it->it_flags & O_CREAT) { + it->it_op |= IT_CREAT; + it->it_create_mode = *((int*)intent->int_arg1); + } + + if (intent->int_opmask & INT_GETATTR) + it->it_op |= IT_GETATTR; + /* XXX */ + if (intent->int_opmask & INT_SETATTR) + LBUG(); + + /* libsysio is different to linux vfs when doing unlink/rmdir, + * INT_UPDPARENT was passed down during name resolution. Here + * we treat it as normal lookup, later unlink()/rmdir() will + * do the actual work */ + + /* conform to kernel code, if only IT_LOOKUP was set, don't + * pass down it */ + if (!it->it_op || it->it_op == IT_LOOKUP) { + OBD_FREE(it, sizeof(*it)); + it = NULL; + } + if (it) + it->it_op_release = ll_intent_release; + + CDEBUG(D_VFSTRACE, "final intent 0x%x\n", it ? it->it_op : 0); + return it; +} + +int llu_iop_lookup(struct pnode *pnode, + struct inode **inop, + struct intent *intnt, + const char *path) +{ + struct lookup_intent *it; + int rc; + ENTRY; + + *inop = NULL; + + /* the mount root inode have no name, so don't call + * remote in this case. but probably we need revalidate + * it here? FIXME */ + if (pnode->p_mount->mnt_root == pnode) { + struct inode *i = pnode->p_base->pb_ino; + *inop = i; + return 0; + } + + if (!pnode->p_base->pb_name.len) + RETURN(-EINVAL); + + it = translate_lookup_intent(intnt, path); + + /* param flags is not used, let it be 0 */ + if (llu_pb_revalidate(pnode, 0, it)) { + LASSERT(pnode->p_base->pb_ino); + *inop = pnode->p_base->pb_ino; + RETURN(0); + } + + rc = llu_lookup_it(pnode->p_parent->p_base->pb_ino, pnode, it, 0); + if (!rc) { + if (!pnode->p_base->pb_ino) + rc = -ENOENT; + else + *inop = pnode->p_base->pb_ino; + } + + RETURN(rc); +} + diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index c5df187..70fd26e 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -39,28 +38,80 @@ #include "llite_lib.h" -int llu_iop_iodone(struct ioctx *ioctxp __IS_UNUSED) -{ - return 1; -} - -/* - * this grabs a lock and manually implements behaviour that makes it look - * like the OST is returning the file size with each lock acquisition - */ -int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, - struct lov_stripe_md *lsm, - int mode, struct ldlm_extent *extent, - struct lustre_handle *lockh) +#if 0 +void llu_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, + struct ldlm_lock *lock) { + clear_bit(LLI_F_HAVE_SIZE_LOCK, &(llu_i2info(inode)->lli_flags)); #if 0 - struct ll_inode_info *lli = ll_i2info(inode); - int rc; + struct ldlm_extent *extent = &lock->l_extent; + unsigned long start, end, count, skip, i, j; + struct page *page; + int ret; ENTRY; - rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh); - if (rc != ELDLM_OK) - RETURN(rc); + CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n", + inode->i_ino, inode, extent->start, extent->end, inode->i_size); + + start = extent->start >> PAGE_CACHE_SHIFT; + count = ~0; + skip = 0; + end = (extent->end >> PAGE_CACHE_SHIFT) + 1; + if ((end << PAGE_CACHE_SHIFT) < extent->end) + end = ~0; + if (lsm->lsm_stripe_count > 1) { + struct { + char name[16]; + struct ldlm_lock *lock; + struct lov_stripe_md *lsm; + } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm }; + __u32 stripe; + __u32 vallen = sizeof(stripe); + int rc; + + /* get our offset in the lov */ + rc = obd_get_info(ll_i2obdconn(inode), sizeof(key), + &key, &vallen, &stripe); + if (rc != 0) { + CERROR("obd_get_info: rc = %d\n", rc); + LBUG(); + } + LASSERT(stripe < lsm->lsm_stripe_count); + + count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT; + skip = (lsm->lsm_stripe_count - 1) * count; + start += (start/count * skip) + (stripe * count); + if (end != ~0) + end += (end/count * skip) + (stripe * count); + } + + i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT; + if (end >= i) + clear_bit(LLI_F_HAVE_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); + if (i < end) + end = i; + + CDEBUG(D_INODE, "start: %lu j: %lu count: %lu skip: %lu end: %lu\n", + start, start % count, count, skip, end); + + /* start writeback on dirty pages in the extent when its PW */ + for (i = start, j = start % count; + lock->l_granted_mode == LCK_PW && i < end; j++, i++) { + if (j == count) { + i += skip; + j = 0; + } + /* its unlikely, but give us a chance to bail when we're out */ + PGCACHE_WRLOCK(inode->i_mapping); + if (list_empty(&inode->i_mapping->dirty_pages)) { + CDEBUG(D_INODE, "dirty list empty\n"); + PGCACHE_WRUNLOCK(inode->i_mapping); + break; + } + PGCACHE_WRUNLOCK(inode->i_mapping); + + if (need_resched()) + schedule(); /* always do a getattr for the first person to pop out of lock * acquisition.. the DID_GETATTR flag and semaphore serialize @@ -78,273 +129,496 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, if (rc == 0) { set_bit(LLI_F_DID_GETATTR, &lli->lli_flags); } else { - /* XXX can this fail? */ - ll_extent_unlock(fd, inode, lsm, mode, lockh); + unlock_page(page); } + page_cache_release(page); + } - up(&lli->lli_getattr_sem); - RETURN(rc); + /* our locks are page granular thanks to osc_enqueue, we invalidate the + * whole page. */ + LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0); + LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0); + for (i = start, j = start % count ; i < end ; j++, i++) { + if ( j == count ) { + i += skip; + j = 0; + } + PGCACHE_WRLOCK(inode->i_mapping); + if (list_empty(&inode->i_mapping->dirty_pages) && + list_empty(&inode->i_mapping->clean_pages) && + list_empty(&inode->i_mapping->locked_pages)) { + CDEBUG(D_INODE, "nothing left\n"); + PGCACHE_WRUNLOCK(inode->i_mapping); + break; + } + PGCACHE_WRUNLOCK(inode->i_mapping); + if (need_resched()) + schedule(); + page = find_get_page(inode->i_mapping, i); + if (page == NULL) + continue; + CDEBUG(D_INODE, "dropping page %p at %lu\n", page, page->index); + lock_page(page); + if (page->mapping) /* might have raced */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + truncate_complete_page(page); #else - return ELDLM_OK; + truncate_complete_page(page->mapping, page); +#endif + unlock_page(page); + page_cache_release(page); + } + EXIT; #endif } -int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode, - struct lov_stripe_md *lsm, int mode, - struct lustre_handle *lockh) +int llu_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, + void *data, int flag) { -#if 0 - struct ll_sb_info *sbi = ll_i2sbi(inode); + struct inode *inode = data; + struct llu_inode_info *lli = llu_i2info(inode); + struct lustre_handle lockh = {0}; int rc; ENTRY; - /* XXX phil: can we do this? won't it screw the file size up? */ - if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || - (sbi->ll_flags & LL_SBI_NOLCK)) - RETURN(0); + if (inode == NULL) + LBUG(); - rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh); + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: { + /* FIXME: we could be given 'canceling intents' so that we + * could know to write-back or simply throw away the pages + * based on if the cancel comes from a desire to, say, + * read or truncate.. */ + llu_pgcache_remove_extent(inode, lli->lli_smd, lock); + break; + } + default: + LBUG(); + } - RETURN(rc); -#else - return 0; + RETURN(0); +} #endif + +static int llu_extent_lock_callback(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) +{ + struct lustre_handle lockh = { 0 }; + int rc; + ENTRY; + + + if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { + LDLM_ERROR(lock, "cancelling lock with bad data %p", data); + LBUG(); + } + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: { + struct inode *inode = llu_inode_from_lock(lock); + struct llu_inode_info *lli; + + if (!inode) + RETURN(0); + lli= llu_i2info(inode); + if (!lli) { + I_RELE(inode); + RETURN(0); + } + if (!lli->lli_smd) { + I_RELE(inode); + RETURN(0); + } + +/* + ll_pgcache_remove_extent(inode, lli->lli_smd, lock); + iput(inode); +*/ + I_RELE(inode); + break; + } + default: + LBUG(); + } + + RETURN(0); } -static int llu_brw(int cmd, struct inode *inode, struct page *page, int flags) +int llu_extent_lock_no_validate(struct ll_file_data *fd, + struct inode *inode, + struct lov_stripe_md *lsm, + int mode, + struct ldlm_extent *extent, + struct lustre_handle *lockh, + int ast_flags) { + struct llu_sb_info *sbi = llu_i2sbi(inode); struct llu_inode_info *lli = llu_i2info(inode); - struct lov_stripe_md *lsm = lli->lli_smd; - struct brw_page pg; int rc; ENTRY; - pg.pg = page; - pg.off = ((obd_off)page->index) << PAGE_SHIFT; + LASSERT(lockh->cookie == 0); - /* FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME */ #if 0 - if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > lli->lli_st_size)) - pg.count = lli->lli_st_size % PAGE_SIZE; - else + /* XXX phil: can we do this? won't it screw the file size up? */ + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) + RETURN(0); #endif - pg.count = PAGE_SIZE; - CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n", - cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, lli->lli_st_ino, - pg.off, pg.off); - if (pg.count == 0) { - LBUG(); - } - - pg.flag = flags; + CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n", + lli->lli_st_ino, extent->start, extent->end); - rc = obd_brw(cmd, llu_i2obdconn(inode), lsm, 1, &pg, set, NULL); - if (rc) { - CERROR("error from obd_brw: rc = %d\n", rc); - } + rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent, + sizeof(extent), mode, &ast_flags, + llu_extent_lock_callback, inode, lockh); RETURN(rc); } -static int llu_prepare_write(struct inode *inode, struct page *page, - unsigned from, unsigned to) +/* + * this grabs a lock and manually implements behaviour that makes it look like + * the OST is returning the file size with each lock acquisition. + */ +int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct ldlm_extent *extent, struct lustre_handle *lockh) { struct llu_inode_info *lli = llu_i2info(inode); - obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; - int rc = 0; + struct obd_export *exp = llu_i2obdexp(inode); + struct ldlm_extent size_lock; + struct lustre_handle match_lockh = {0}; + int flags, rc, matched; ENTRY; -#if 0 - if (!PageLocked(page)) - LBUG(); - - if (PageUptodate(page)) - RETURN(0); + rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0); + if (rc != ELDLM_OK) + RETURN(rc); - //POISON(addr + from, 0xca, to - from); -#endif - /* We're completely overwriting an existing page, so _don't_ set it up - * to date until commit_write */ - if (from == 0 && to == PAGE_SIZE) + if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) RETURN(0); - /* If are writing to a new page, no need to read old data. - * the extent locking and getattr procedures in ll_file_write have - * guaranteed that i_size is stable enough for our zeroing needs */ - if (lli->lli_st_size <= offset) { - memset(kmap(page), 0, PAGE_SIZE); - kunmap(page); - GOTO(prepare_done, rc = 0); + rc = llu_inode_getattr(inode, lsm); + if (rc) { + llu_extent_unlock(fd, inode, lsm, mode, lockh); + RETURN(rc); } - rc = llu_brw(OBD_BRW_READ, inode, page, 0); + size_lock.start = lli->lli_st_size; + size_lock.end = OBD_OBJECT_EOF; - EXIT; + /* XXX I bet we should be checking the lock ignore flags.. */ + flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; + matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock, + sizeof(size_lock), LCK_PR, &flags, inode, + &match_lockh); - prepare_done: - return rc; + /* hey, alright, we hold a size lock that covers the size we + * just found, its not going to change for a while.. */ + if (matched == 1) { + set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags); + obd_cancel(exp, lsm, LCK_PR, &match_lockh); + } + + RETURN(0); } -static int llu_commit_write(struct inode *inode, struct page *page, - unsigned from, unsigned to) +int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + struct lustre_handle *lockh) { - struct llu_inode_info *lli = llu_i2info(inode); - loff_t size; + struct llu_sb_info *sbi = llu_i2sbi(inode); int rc; ENTRY; #if 0 - LASSERT(inode == file->f_dentry->d_inode); - LASSERT(PageLocked(page)); - - CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n", - inode, page, from, to, page->index); - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu,from=%d,to=%d\n", - inode->i_ino, from, to); - /* to match full page case in prepare_write */ - SetPageUptodate(page); - /* mark the page dirty, put it on mapping->dirty, - * mark the inode PAGES_DIRTY, put it on sb->dirty */ - set_page_dirty(page); + /* XXX phil: can we do this? won't it screw the file size up? */ + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) + RETURN(0); #endif - rc = llu_brw(OBD_BRW_WRITE, inode, page, 0); - if (rc) - return rc; + rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh); - /* this is matched by a hack in obdo_to_inode at the moment */ - size = (((obd_off)page->index) << PAGE_SHIFT) + to; - if (size > lli->lli_st_size) - lli->lli_st_size = size; + RETURN(rc); +} - RETURN(0); -} /* ll_commit_write */ +#define LLAP_MAGIC 12346789 -ssize_t -llu_generic_file_write(struct inode *inode, const char *buf, - size_t count, loff_t pos) +struct ll_async_page { + int llap_magic; + void *llap_cookie; + int llap_queued; + struct page *llap_page; + struct inode *llap_inode; +}; + +static struct ll_async_page *llap_from_cookie(void *cookie) { - struct page *page; - ssize_t written; - long status = 0; - int err; - unsigned bytes; - - if ((ssize_t) count < 0) - return -EINVAL; -#if 0 - down(&inode->i_sem); -#endif - if (pos < 0) - return -EINVAL; + struct ll_async_page *llap = cookie; + if (llap->llap_magic != LLAP_MAGIC) + return ERR_PTR(-EINVAL); + return llap; +}; - written = 0; +static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa) +{ + struct ll_async_page *llap; + struct inode *inode; + struct lov_stripe_md *lsm; + obd_flag valid_flags; + ENTRY; -#if 0 - remove_suid(inode); - update_inode_times(inode); -#endif + llap = llap_from_cookie(data); + if (IS_ERR(llap)) { + EXIT; + return; + } + + inode = llap->llap_inode; + lsm = llu_i2info(inode)->lli_smd; + + oa->o_id = lsm->lsm_object_id; + oa->o_valid = OBD_MD_FLID; + valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME; + if (cmd == OBD_BRW_WRITE) + valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME; + + obdo_from_inode(oa, inode, valid_flags); + EXIT; +} + +/* called for each page in a completed rpc.*/ +static void llu_ap_completion(void *data, int cmd, int rc) +{ + struct ll_async_page *llap; + struct page *page; + + llap = llap_from_cookie(data); + if (IS_ERR(llap)) { + EXIT; + return; + } + + llap->llap_queued = 0; + page = llap->llap_page; + + if (rc != 0) { + if (cmd == OBD_BRW_WRITE) + CERROR("writeback error on page %p index %ld: %d\n", + page, page->index, rc); + } + EXIT; +} + +static struct obd_async_page_ops llu_async_page_ops = { + .ap_make_ready = NULL, + .ap_refresh_count = NULL, + .ap_fill_obdo = llu_ap_fill_obdo, + .ap_completion = llu_ap_completion, +}; + +static +struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int npages) +{ + struct llu_sysio_cookie *cookie; + + OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(npages)); + if (cookie) { + I_REF(inode); + cookie->lsc_inode = inode; + cookie->lsc_npages = npages; + cookie->lsc_llap = (struct ll_async_page *)(cookie + 1); + cookie->lsc_pages = (struct page *) (cookie->lsc_llap + npages); + + osic_init(&cookie->lsc_osic); + } + + return cookie; +} + +static +void put_sysio_cookie(struct llu_sysio_cookie *cookie) +{ + struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd; + struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode); + struct ll_async_page *llap = cookie->lsc_llap; + int i; + + for (i = 0; i< cookie->lsc_npages; i++) { + if (llap[i].llap_cookie) + obd_teardown_async_page(exp, lsm, NULL, + llap[i].llap_cookie); + } + + I_RELE(cookie->lsc_inode); + + OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_npages)); +} + +static +int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd, + char *buf, loff_t pos, size_t count) +{ + struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd; + struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode); + struct page *pages = cookie->lsc_pages; + struct ll_async_page *llap = cookie->lsc_llap; + int i, rc, npages = 0; + ENTRY; + + if (!exp) + RETURN(-EINVAL); + + cookie->lsc_rwcount = count; + + /* prepare the pages array */ do { - unsigned long index, offset; - char *kaddr; - - /* - * Try to find the page in the cache. If it isn't there, - * allocate a free page. - */ - offset = (pos & (PAGE_CACHE_SIZE -1)); /* Within page */ - index = pos >> PAGE_CACHE_SHIFT; - bytes = PAGE_CACHE_SIZE - offset; - if (bytes > count) { - bytes = count; - } - - status = -ENOMEM; /* we'll assign it later anyway */ - page = __grab_cache_page(index); - if (!page) - break; - - kaddr = kmap(page); - status = llu_prepare_write(inode, page, offset, offset+bytes); - if (status) - goto sync_failure; - - memcpy(kaddr+offset, buf, bytes); - - status = llu_commit_write(inode, page, offset, offset+bytes); - if (!status) - status = bytes; - - if (status >= 0) { - written += status; - count -= status; - pos += status; - buf += status; - } -unlock: - kunmap(page); - page_cache_release(page); - - if (status < 0) - break; - } while (count); -done: - err = written ? written : status; + unsigned long index, offset, bytes; + + offset = (pos & ~PAGE_CACHE_MASK); + index = pos >> PAGE_CACHE_SHIFT; + bytes = PAGE_CACHE_SIZE - offset; + if (bytes > count) + bytes = count; + + /* prepare page for this index */ + pages[npages].index = index; + pages[npages].addr = buf - offset; + + pages[npages]._offset = offset; + pages[npages]._count = bytes; + + npages++; + count -= bytes; + pos += bytes; + buf += bytes; + } while (count); + + for (i = 0; i < npages; i++) { + llap[i].llap_magic = LLAP_MAGIC; + rc = obd_prep_async_page(exp, lsm, NULL, &pages[i], + (obd_off)pages[i].index << PAGE_SHIFT, + &llu_async_page_ops, + &llap[i], &llap[i].llap_cookie); + if (rc) { + llap[i].llap_cookie = NULL; + RETURN(rc); + } + CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", + &llap[i], &pages[i], llap[i].llap_cookie, + (obd_off)pages[i].index << PAGE_SHIFT); + pages[i].private = (unsigned long)&llap[i]; + llap[i].llap_page = &pages[i]; + llap[i].llap_inode = cookie->lsc_inode; + + rc = obd_queue_sync_io(exp, lsm, NULL, &cookie->lsc_osic, + llap[i].llap_cookie, cmd, + pages[i]._offset, pages[i]._count, 0); + if (rc) + RETURN(rc); + + llap[i].llap_queued = 1; + } -#if 0 - up(&inode->i_sem); -#endif - return err; - - status = -EFAULT; - goto unlock; - -sync_failure: - /* - * If blocksize < pagesize, prepare_write() may have instantiated a - * few blocks outside i_size. Trim these off again. - */ - kunmap(page); - page_cache_release(page); - goto done; + RETURN(0); } -ssize_t llu_file_write(struct inode *inode, const struct iovec *iovec, - size_t iovlen, loff_t pos) +static +int llu_start_async_io(struct llu_sysio_cookie *cookie) +{ + struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd; + struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode); + + return obd_trigger_sync_io(exp, lsm, NULL, &cookie->lsc_osic); +} + +/* + * read/write a continuous buffer for an inode (zero-copy) + */ +struct llu_sysio_cookie* +llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos) +{ + struct llu_sysio_cookie *cookie; + int max_pages, rc; + ENTRY; + + max_pages = (count >> PAGE_SHIFT) + 2; + + cookie = get_sysio_cookie(inode, max_pages); + if (!cookie) + RETURN(ERR_PTR(-ENOMEM)); + + rc = llu_prep_async_io(cookie, cmd, buf, pos, count); + if (rc) + GOTO(out_cleanup, rc); + + rc = llu_start_async_io(cookie); + if (rc) + GOTO(out_cleanup, rc); + +/* + rc = osic_wait(&osic); + if (rc) { + CERROR("file i/o error!\n"); + rw_count = rc; + } +*/ + RETURN(cookie); + +out_cleanup: + put_sysio_cookie(cookie); + RETURN(ERR_PTR(rc)); +} + +struct llu_sysio_callback_args* +llu_file_write(struct inode *inode, const struct iovec *iovec, + size_t iovlen, loff_t pos) { struct llu_inode_info *lli = llu_i2info(inode); - struct ll_file_data *fd = lli->lli_file_data; /* XXX not ready don't use it now */ - struct lustre_handle lockh = { 0 }; + struct ll_file_data *fd = lli->lli_file_data; + struct lustre_handle lockh = {0}; struct lov_stripe_md *lsm = lli->lli_smd; + struct llu_sysio_callback_args *lsca; + struct llu_sysio_cookie *cookie; struct ldlm_extent extent; ldlm_error_t err; - ssize_t retval = 0; + int iovidx; ENTRY; /* XXX consider other types later */ if (!S_ISREG(lli->lli_st_mode)) LBUG(); -#if 0 - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu,size="LPSZ",offset=%Ld\n", - inode->i_ino, count, *ppos); - - /* - * sleep doing some writeback work of this mount's dirty data - * if the VM thinks we're low on memory.. other dirtying code - * paths should think about doing this, too, but they should be - * careful not to hold locked pages while they do so. like - * ll_prepare_write. *cough* - */ - ll_check_dirty(inode->i_sb); -#endif - while (iovlen--) { - const char *buf = iovec[iovlen].iov_base; - size_t count = iovec[iovlen].iov_len; - /* POSIX, but surprised the VFS doesn't check this already */ + LASSERT(iovlen <= MAX_IOVEC); + + OBD_ALLOC(lsca, sizeof(*lsca)); + if (!lsca) + RETURN(ERR_PTR(-ENOMEM)); + + /* FIXME optimize the following extent locking */ + for (iovidx = 0; iovidx < iovlen; iovidx++) { + char *buf = iovec[iovidx].iov_base; + size_t count = iovec[iovidx].iov_len; + if (count == 0) continue; + /* FIXME libsysio haven't consider the open flags + * such as O_APPEND */ #if 0 if (!S_ISBLK(lli->lli_st_mode) && file->f_flags & O_APPEND) { extent.start = 0; @@ -360,26 +634,48 @@ ssize_t llu_file_write(struct inode *inode, const struct iovec *iovec, err = llu_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh); if (err != ELDLM_OK) - RETURN(-ENOLCK); - -#if 0 - if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) - *ppos = inode->i_size; + GOTO(err_out, err = -ENOLCK); CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n", - inode->i_ino, count, *ppos); -#endif - retval += llu_generic_file_write(inode, buf, count, pos); + lli->lli_st_ino, count, pos); + + cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos); + if (!IS_ERR(cookie)) { + /* save cookie */ + lsca->cookies[lsca->ncookies++] = cookie; + pos += count; + /* file size grow. XXX should be done here? */ + if (pos > lli->lli_st_size) { + lli->lli_st_size = pos; + set_bit(LLI_F_PREFER_EXTENDED_SIZE, + &lli->lli_flags); + } + } else { + llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh); + GOTO(err_out, err = PTR_ERR(cookie)); + } + + /* XXX errors? */ + err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh); + if (err) + CERROR("extent unlock error %d\n", err); + } + + RETURN(lsca); + +err_out: + /* teardown all async stuff */ + while (lsca->ncookies--) { + put_sysio_cookie(lsca->cookies[lsca->ncookies]); } + OBD_FREE(lsca, sizeof(*lsca)); - /* XXX errors? */ - ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh); - return(retval); + RETURN(ERR_PTR(err)); } +#if 0 static void llu_update_atime(struct inode *inode) { -#if 0 struct llu_inode_info *lli = llu_i2info(inode); #ifdef USE_ATIME @@ -398,122 +694,109 @@ static void llu_update_atime(struct inode *inode) /* update atime, but don't explicitly write it out just this change */ inode->i_atime = CURRENT_TIME; #endif -#endif -} - -static size_t llu_generic_file_read(struct inode *inode, char *buf, - size_t count, loff_t pos) -{ - struct llu_inode_info *lli = llu_i2info(inode); - unsigned long index, offset; - int error = 0; - size_t readed = 0; - - index = pos >> PAGE_CACHE_SHIFT; - offset = pos & ~PAGE_CACHE_MASK; - - do { - struct page *page; - unsigned long end_index, nr; - - end_index = lli->lli_st_size >> PAGE_CACHE_SHIFT; - - if (index > end_index) - break; - nr = PAGE_CACHE_SIZE; - if (index == end_index) { - nr = lli->lli_st_size & ~PAGE_CACHE_MASK; - if (nr <= offset) - break; - } - - nr = nr - offset; - if (nr > count) - nr = count; - - page = grab_cache_page(index); - if (!page) { - error = -ENOMEM; - break; - } - - error = llu_brw(OBD_BRW_READ, inode, page, 0); - if (error) { - page_cache_release(page); - break; - } - - memcpy(buf, kmap(page)+offset, nr); - offset += nr; - index += offset >> PAGE_CACHE_SHIFT; - offset &= ~PAGE_CACHE_MASK; - readed += nr; - count -= nr; - - page_cache_release(page); - } while (count); - - if (error) - return error; - return readed; } +#endif -ssize_t llu_file_read(struct inode *inode, const struct iovec *iovec, +struct llu_sysio_callback_args* +llu_file_read(struct inode *inode, const struct iovec *iovec, size_t iovlen, loff_t pos) { struct llu_inode_info *lli = llu_i2info(inode); struct ll_file_data *fd = lli->lli_file_data; struct lov_stripe_md *lsm = lli->lli_smd; struct lustre_handle lockh = { 0 }; -#if 0 - struct ll_read_extent rextent; -#else struct ldlm_extent extent; -#endif + struct llu_sysio_callback_args *lsca; + struct llu_sysio_cookie *cookie; + int iovidx; + ldlm_error_t err; - ssize_t retval = 0; ENTRY; - while (iovlen--) { - char *buf = iovec[iovlen].iov_base; - size_t count = iovec[iovlen].iov_len; + OBD_ALLOC(lsca, sizeof(*lsca)); + if (!lsca) + RETURN(ERR_PTR(-ENOMEM)); + + for (iovidx = 0; iovidx < iovlen; iovidx++) { + char *buf = iovec[iovidx].iov_base; + size_t count = iovec[iovidx].iov_len; /* "If nbyte is 0, read() will return 0 and have no other results." * -- Single Unix Spec */ if (count == 0) - RETURN(0); + continue; -#if 0 - rextent.re_extent.start = pos; - rextent.re_extent.end = pos + count - 1; -#else extent.start = pos; extent.end = pos + count - 1; -#endif + err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); if (err != ELDLM_OK) - RETURN(-ENOLCK); -#if 0 - rextent.re_task = current; - spin_lock(&lli->lli_read_extent_lock); - list_add(&rextent.re_lli_item, &lli->lli_read_extents); - spin_unlock(&lli->lli_read_extent_lock); -#endif + GOTO(err_out, err = -ENOLCK); + CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n", lli->lli_st_ino, count, pos); - retval = llu_generic_file_read(inode, buf, count, pos); + + cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos); + if (!IS_ERR(cookie)) { + /* save cookie */ + lsca->cookies[lsca->ncookies++] = cookie; + pos += count; + } else { + llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); + GOTO(err_out, err = PTR_ERR(cookie)); + } + + /* XXX errors? */ + err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); + if (err) + CERROR("extent_unlock fail: %d\n", err); + } #if 0 - spin_lock(&lli->lli_read_extent_lock); - list_del(&rextent.re_lli_item); - spin_unlock(&lli->lli_read_extent_lock); + if (readed > 0) + llu_update_atime(inode); #endif - } + RETURN(lsca); - if (retval > 0) - llu_update_atime(inode); +err_out: + /* teardown all async stuff */ + while (lsca->ncookies--) { + put_sysio_cookie(lsca->cookies[lsca->ncookies]); + } + OBD_FREE(lsca, sizeof(*lsca)); - /* XXX errors? */ - ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); - RETURN(retval); + RETURN(ERR_PTR(err)); } +int llu_iop_iodone(struct ioctx *ioctxp) +{ + struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private; + struct llu_sysio_cookie *cookie; + int i, err = 0, rc = 0; + ENTRY; + + /* write/read(fd, buf, 0) */ + if (!lsca) + return 1; + + LASSERT(!IS_ERR(lsca)); + + for (i = 0; i < lsca->ncookies; i++) { + cookie = lsca->cookies[i]; + if (cookie) { + err = osic_wait(&cookie->lsc_osic); + if (err && !rc) + rc = err; + if (!rc) + ioctxp->ioctx_cc += cookie->lsc_rwcount; + put_sysio_cookie(cookie); + } + } + + if (rc) + ioctxp->ioctx_cc = rc; + + OBD_FREE(lsca, sizeof(*lsca)); + ioctxp->ioctx_private = NULL; + + RETURN(1); +} diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 087d194..af3d3aa 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -25,11 +25,17 @@ #include #include -#include #include #include #include +#include +#include #include +#ifndef __CYGWIN__ +# include +#else +# include +#endif #include #include @@ -41,7 +47,28 @@ static void llu_fsop_gone(struct filesys *fs) { - /* FIXME */ + struct llu_sb_info *sbi = (struct llu_sb_info *) fs->fs_private; + struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp); + struct ll_fid rootfid; + ENTRY; + + list_del(&sbi->ll_conn_chain); + obd_disconnect(sbi->ll_osc_exp, 0); + + /* NULL request to force sync on the MDS, and get the last_committed + * value to flush remaining RPCs from the sending queue on client. + * + * XXX This should be an mdc_sync() call to sync the whole MDS fs, + * which we can call for other reasons as well. + */ + if (!obd->obd_no_recov) + mdc_getstatus(sbi->ll_mdc_exp, &rootfid); + + obd_disconnect(sbi->ll_mdc_exp, 0); + + OBD_FREE(sbi, sizeof(*sbi)); + + EXIT; } static struct inode_ops llu_inode_ops; @@ -53,11 +80,18 @@ void llu_update_inode(struct inode *inode, struct mds_body *body, LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); if (lsm != NULL) { - if (lli->lli_smd == NULL) + if (lli->lli_smd == NULL) { lli->lli_smd = lsm; - else - LASSERT (!memcmp (lli->lli_smd, lsm, - sizeof (*lsm))); + lli->lli_maxbytes = lsm->lsm_maxbytes; + if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) + lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; + } else { + if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { + CERROR("lsm mismatch for inode %ld\n", + lli->lli_st_ino); + LBUG(); + } + } } if (body->valid & OBD_MD_FLID) @@ -104,6 +138,12 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) valid &= src->o_valid; + if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) + CDEBUG(D_INODE, "valid %x, cur time %lu/%lu, new %lu/%lu\n", + src->o_valid, + LTIME_S(lli->lli_st_mtime), LTIME_S(lli->lli_st_ctime), + (long)src->o_mtime, (long)src->o_ctime); + if (valid & OBD_MD_FLATIME) LTIME_S(lli->lli_st_atime) = src->o_atime; if (valid & OBD_MD_FLMTIME) @@ -131,72 +171,153 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) if (valid & OBD_MD_FLGENER) lli->lli_st_generation = src->o_generation; if (valid & OBD_MD_FLRDEV) - lli->lli_st_rdev = src->o_rdev; + lli->lli_st_rdev = to_kdev_t(src->o_rdev); } +#define S_IRWXUGO (S_IRWXU|S_IRWXG|S_IRWXO) +#define S_IALLUGO (S_ISUID|S_ISGID|S_ISVTX|S_IRWXUGO) + void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) { struct llu_inode_info *lli = llu_i2info(src); + obd_flag newvalid = 0; - if (valid & OBD_MD_FLATIME) + if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) + CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n", + valid, LTIME_S(lli->lli_st_mtime), + LTIME_S(lli->lli_st_ctime)); + + if (valid & OBD_MD_FLATIME) { dst->o_atime = LTIME_S(lli->lli_st_atime); - if (valid & OBD_MD_FLMTIME) + newvalid |= OBD_MD_FLATIME; + } + if (valid & OBD_MD_FLMTIME) { dst->o_mtime = LTIME_S(lli->lli_st_mtime); - if (valid & OBD_MD_FLCTIME) + newvalid |= OBD_MD_FLMTIME; + } + if (valid & OBD_MD_FLCTIME) { dst->o_ctime = LTIME_S(lli->lli_st_ctime); - if (valid & OBD_MD_FLSIZE) + newvalid |= OBD_MD_FLCTIME; + } + if (valid & OBD_MD_FLSIZE) { dst->o_size = lli->lli_st_size; - if (valid & OBD_MD_FLBLOCKS) /* allocation of space */ + newvalid |= OBD_MD_FLSIZE; + } + if (valid & OBD_MD_FLBLOCKS) { /* allocation of space (x512 bytes) */ dst->o_blocks = lli->lli_st_blocks; - if (valid & OBD_MD_FLBLKSZ) + newvalid |= OBD_MD_FLBLOCKS; + } + if (valid & OBD_MD_FLBLKSZ) { /* optimal block size */ dst->o_blksize = lli->lli_st_blksize; - if (valid & OBD_MD_FLTYPE) - dst->o_mode = (dst->o_mode & ~S_IFMT) | (lli->lli_st_mode & S_IFMT); - if (valid & OBD_MD_FLMODE) - dst->o_mode = (dst->o_mode & S_IFMT) | (lli->lli_st_mode & ~S_IFMT); - if (valid & OBD_MD_FLUID) + newvalid |= OBD_MD_FLBLKSZ; + } + if (valid & OBD_MD_FLTYPE) { + dst->o_mode = (dst->o_mode & S_IALLUGO)|(lli->lli_st_mode & S_IFMT); + newvalid |= OBD_MD_FLTYPE; + } + if (valid & OBD_MD_FLMODE) { + dst->o_mode = (dst->o_mode & S_IFMT)|(lli->lli_st_mode & S_IALLUGO); + newvalid |= OBD_MD_FLMODE; + } + if (valid & OBD_MD_FLUID) { dst->o_uid = lli->lli_st_uid; - if (valid & OBD_MD_FLGID) + newvalid |= OBD_MD_FLUID; + } + if (valid & OBD_MD_FLGID) { dst->o_gid = lli->lli_st_gid; - if (valid & OBD_MD_FLFLAGS) + newvalid |= OBD_MD_FLGID; + } + if (valid & OBD_MD_FLFLAGS) { dst->o_flags = lli->lli_st_flags; - if (valid & OBD_MD_FLNLINK) + newvalid |= OBD_MD_FLFLAGS; + } + if (valid & OBD_MD_FLNLINK) { dst->o_nlink = lli->lli_st_nlink; - if (valid & OBD_MD_FLGENER) + newvalid |= OBD_MD_FLNLINK; + } + if (valid & OBD_MD_FLGENER) { dst->o_generation = lli->lli_st_generation; - if (valid & OBD_MD_FLRDEV) - dst->o_rdev = (__u32)(lli->lli_st_rdev); + newvalid |= OBD_MD_FLGENER; + } + if (valid & OBD_MD_FLRDEV) { + dst->o_rdev = (__u32)kdev_t_to_nr(lli->lli_st_rdev); + newvalid |= OBD_MD_FLRDEV; + } - dst->o_valid |= (valid & ~OBD_MD_FLID); + dst->o_valid |= newvalid; } -static int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) +/* + * really does the getattr on the inode and updates its fields + */ +int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) { - struct llu_sb_info *sbi = llu_i2sbi(inode); + struct llu_inode_info *lli = llu_i2info(inode); + struct obd_export *exp = llu_i2obdexp(inode); + struct ptlrpc_request_set *set; struct obdo oa; + obd_flag refresh_valid; int rc; ENTRY; LASSERT(lsm); - LASSERT(sbi); + LASSERT(lli); memset(&oa, 0, sizeof oa); oa.o_id = lsm->lsm_object_id; oa.o_mode = S_IFREG; oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME; + OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + OBD_MD_FLCTIME; - rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm); + set = ptlrpc_prep_set(); + if (set == NULL) { + CERROR ("ENOMEM allocing request set\n"); + rc = -ENOMEM; + } else { + rc = obd_getattr_async(exp, &oa, lsm, set); + if (rc == 0) + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + } if (rc) RETURN(rc); - obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); + refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + OBD_MD_FLCTIME | OBD_MD_FLSIZE; + + /* We set this flag in commit write as we extend the file size. When + * the bit is set and the lock is canceled that covers the file size, + * we clear the bit. This is enough to protect the window where our + * local size extension is needed for writeback. However, it relies on + * behaviour that won't be true in the near future. This assumes that + * all getattr callers get extent locks, which they currnetly do. It + * also assumes that we only send discarding asts for {0,eof} truncates + * as is currently the case. This will have to be replaced by the + * proper eoc communication between clients and the ost, which is on + * its way. */ + if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) { + if (oa.o_size < lli->lli_st_size) + refresh_valid &= ~OBD_MD_FLSIZE; + else + clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags); + } + + obdo_refresh_inode(inode, &oa, refresh_valid); +/* + if (inode->i_blksize < PAGE_CACHE_SIZE) + inode->i_blksize = PAGE_CACHE_SIZE; + + CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n", + lsm->lsm_object_id, inode->i_size, inode->i_blocks, + inode->i_blksize); +*/ RETURN(0); } -struct inode* llu_new_inode(struct filesys *fs, ino_t ino, mode_t mode) +static struct inode* llu_new_inode(struct filesys *fs, + struct ll_fid *fid) { struct inode *inode; struct llu_inode_info *lli; @@ -210,17 +331,22 @@ struct inode* llu_new_inode(struct filesys *fs, ino_t ino, mode_t mode) lli->lli_smd = NULL; lli->lli_symlink_name = NULL; lli->lli_flags = 0; - INIT_LIST_HEAD(&lli->lli_read_extents); + lli->lli_maxbytes = (__u64)(~0UL); lli->lli_file_data = NULL; - /* could file_identifier be 0 ? FIXME */ - inode = _sysio_i_new(fs, ino, NULL, + lli->lli_sysio_fid.fid_data = &lli->lli_fid; + lli->lli_sysio_fid.fid_len = sizeof(lli->lli_fid); + + memcpy(&lli->lli_fid, fid, sizeof(*fid)); + + /* file identifier is needed by functions like _sysio_i_find() */ + inode = _sysio_i_new(fs, &lli->lli_sysio_fid, #ifndef AUTOMOUNT_FILE_NAME - mode & S_IFMT, + fid->f_type & S_IFMT, #else - mode, /* all of the bits! */ + fid->f_type, /* all of the bits! */ #endif - 0, + 0, 0, &llu_inode_ops, lli); if (!inode) @@ -229,113 +355,165 @@ struct inode* llu_new_inode(struct filesys *fs, ino_t ino, mode_t mode) return inode; } -static int llu_iop_lookup(struct pnode *pnode, - struct inode **inop, - struct intent *intnt __IS_UNUSED, - const char *path __IS_UNUSED) +#if 0 +static int ll_intent_to_lock_mode(struct lookup_intent *it) { - struct pnode_base *pb_dir = pnode->p_parent->p_base; - struct ptlrpc_request *request = NULL; - struct llu_sb_info *sbi = llu_i2sbi(pb_dir->pb_ino); - struct ll_fid *fid = &llu_i2info(pb_dir->pb_ino)->lli_fid; - struct qstr *name = &pnode->p_base->pb_name; - struct mds_body *body; - unsigned long valid; - char *pname; - int rc, easize; - struct ll_read_inode2_cookie lic = {.lic_body = NULL, .lic_lsm = NULL}; - - /* the mount root inode have no name, so don't call - * remote in this case. but probably we need revalidate - * it here? FIXME */ - if (pnode->p_mount->mnt_root == pnode) { - struct inode *i = pnode->p_base->pb_ino; - I_REF(i); - *inop = i; - return 0; - } - - if (!name->len) - return -EINVAL; - - /* mdc_getattr_name require NULL-terminated name */ - OBD_ALLOC(pname, name->len + 1); - if (!pname) - return -ENOMEM; - memcpy(pname, name->name, name->len); - pname[name->len] = 0; - - valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE; - - /* FIXME before getattr_name, we don't know whether - * the inode we are finding is regular or not, so here - * we blindly require server feed in EA data */ - easize = obd_size_diskmd(&sbi->ll_osc_conn, NULL); - valid |= OBD_MD_FLEASIZE; - - rc = mdc_getattr_name(&sbi->ll_mdc_conn, fid, - pname, name->len + 1, - valid, easize, &request); - if (rc < 0) { - CERROR("mdc_getattr_name: %d\n", rc); - rc = -ENOENT; - goto out; + /* CREAT needs to be tested before open (both could be set) */ + if (it->it_op & IT_CREAT) + return LCK_PW; + else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) + return LCK_PR; + + LBUG(); + RETURN(-EINVAL); +} +#endif + +#if 0 +int ll_it_open_error(int phase, struct lookup_intent *it) +{ + if (it_disposition(it, DISP_OPEN_OPEN)) { + if (phase == DISP_OPEN_OPEN) + return it->d.lustre.it_status; + else + return 0; } - body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); - *inop = llu_new_inode(pnode->p_mount->mnt_fs, body->ino, body->mode); - if (!inop) - goto out; - - lic.lic_body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*lic.lic_body)); - LASSERT (lic.lic_body != NULL); - LASSERT_REPSWABBED (request, 0); - - if (S_ISREG(lic.lic_body->mode) && - lic.lic_body->valid & OBD_MD_FLEASIZE) { - struct lov_mds_md *lmm; - int lmm_size; - int rc; - - lmm_size = lic.lic_body->eadatasize; - if (lmm_size == 0) { - CERROR ("OBD_MD_FLEASIZE set but eadatasize 0\n"); - RETURN (-EPROTO); + if (it_disposition(it, DISP_OPEN_CREATE)) { + if (phase == DISP_OPEN_CREATE) + return it->d.lustre.it_status; + else + return 0; + } + + if (it_disposition(it, DISP_LOOKUP_EXECD)) { + if (phase == DISP_LOOKUP_EXECD) + return it->d.lustre.it_status; + else + return 0; + } + CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition, it->d.lustre.it_status); + LBUG(); + return 0; +} +#endif + +static int llu_have_md_lock(struct inode *inode) +{ + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct llu_inode_info *lli = llu_i2info(inode); + struct lustre_handle lockh; + struct ldlm_res_id res_id = { .name = {0} }; + struct obd_device *obddev; + int flags; + ENTRY; + + LASSERT(inode); + + obddev = sbi->ll_mdc_exp->exp_obd; + res_id.name[0] = lli->lli_st_ino; + res_id.name[1] = lli->lli_st_generation; + + CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]); + + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, + NULL, 0, LCK_PR, &lockh)) { + ldlm_lock_decref(&lockh, LCK_PR); + RETURN(1); + } + + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, + NULL, 0, LCK_PW, &lockh)) { + ldlm_lock_decref(&lockh, LCK_PW); + RETURN(1); + } + RETURN(0); +} + +static int llu_inode_revalidate(struct inode *inode) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct lov_stripe_md *lsm = NULL; + ENTRY; + + if (!inode) { + CERROR("REPORT THIS LINE TO PETER\n"); + RETURN(0); + } + + if (!llu_have_md_lock(inode)) { + struct lustre_md md; + struct ptlrpc_request *req = NULL; + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct ll_fid fid; + unsigned long valid = 0; + int rc, ealen = 0; + + /* Why don't we update all valid MDS fields here, if we're + * doing an RPC anyways? -phil */ + if (S_ISREG(lli->lli_st_mode)) { + ealen = obd_size_diskmd(sbi->ll_osc_exp, NULL); + valid |= OBD_MD_FLEASIZE; + } + ll_inode2fid(&fid, inode); + rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req); + if (rc) { + CERROR("failure %d inode %lu\n", rc, lli->lli_st_ino); + RETURN(-abs(rc)); + } + rc = mdc_req2lustre_md(req, 0, sbi->ll_osc_exp, &md); + + /* XXX Too paranoid? */ + if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) && + !((md.body->valid & OBD_MD_FLNLINK) && + (md.body->nlink == 0))) { + CERROR("Asked for %s eadata but got %s (%d)\n", + (valid & OBD_MD_FLEASIZE) ? "some" : "no", + (md.body->valid & OBD_MD_FLEASIZE) ? "some":"none", + md.body->eadatasize); } - lmm = lustre_msg_buf(request->rq_repmsg, 0 + 1, lmm_size); - LASSERT(lmm != NULL); - LASSERT_REPSWABBED (request, 0 + 1); - - rc = obd_unpackmd (&sbi->ll_osc_conn, - &lic.lic_lsm, lmm, lmm_size); - if (rc < 0) { - CERROR ("Error %d unpacking eadata\n", rc); - RETURN (rc); + if (rc) { + ptlrpc_req_finished(req); + RETURN(rc); } - LASSERT (rc >= sizeof (*lic.lic_lsm)); - } else { - lic.lic_lsm = NULL; - } - llu_update_inode(*inop, body, lic.lic_lsm); + llu_update_inode(inode, md.body, md.lsm); + if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm) + obd_free_memmd(sbi->ll_osc_exp, &md.lsm); - if (llu_i2info(*inop)->lli_smd) { - rc = llu_inode_getattr(*inop, llu_i2info(*inop)->lli_smd); - if (rc) - _sysio_i_gone(*inop); + if (md.body->valid & OBD_MD_FLSIZE) + set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, + &llu_i2info(inode)->lli_flags); + ptlrpc_req_finished(req); } -out: - ptlrpc_req_finished(request); - OBD_FREE(pname, name->len + 1); + lsm = llu_i2info(inode)->lli_smd; + if (!lsm) /* object not yet allocated, don't validate size */ + RETURN(0); - return rc; + /* + * unfortunately stat comes in through revalidate and we don't + * differentiate this use from initial instantiation. we're + * also being wildly conservative and flushing write caches + * so that stat really returns the proper size. + */ + { + struct ldlm_extent extent = {0, OBD_OBJECT_EOF}; + struct lustre_handle lockh = {0}; + ldlm_error_t err; + + err = llu_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh); + if (err != ELDLM_OK) + RETURN(err); + + llu_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh); + } + RETURN(0); } -static int llu_iop_getattr(struct pnode *pno, - struct inode *ino, - struct intnl_stat *b) +static void copy_stat_buf(struct inode *ino, struct intnl_stat *b) { struct llu_inode_info *lli = llu_i2info(ino); @@ -352,46 +530,75 @@ static int llu_iop_getattr(struct pnode *pno, b->st_atime = lli->lli_st_atime; b->st_mtime = lli->lli_st_mtime; b->st_ctime = lli->lli_st_ctime; - - return 0; } -int llu_mdc_cancel_unused(struct lustre_handle *conn, - struct llu_inode_info *lli, - int flags) +static int llu_iop_getattr(struct pnode *pno, + struct inode *ino, + struct intnl_stat *b) { - struct ldlm_res_id res_id = - { .name = {lli->lli_st_ino, lli->lli_st_generation} }; - struct obd_device *obddev = class_conn2obd(conn); + int rc; ENTRY; - RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags)); + + if (!ino) { + LASSERT(pno); + LASSERT(pno->p_base->pb_ino); + ino = pno->p_base->pb_ino; + } else { + LASSERT(!pno || pno->p_base->pb_ino == ino); + } + + /* libsysio might call us directly without intent lock, + * we must re-fetch the attrs here + */ + rc = llu_inode_revalidate(ino); + if (!rc) { + copy_stat_buf(ino, b); + + if (llu_i2info(ino)->lli_it) { + struct lookup_intent *it; + + LL_GET_INTENT(ino, it); + it->it_op_release(it); + OBD_FREE(it, sizeof(*it)); + } + } + + RETURN(rc); } -static void llu_clear_inode(struct inode *inode) +static int null_if_equal(struct ldlm_lock *lock, void *data) { - struct llu_sb_info *sbi = llu_i2sbi(inode); + if (data == lock->l_ast_data) + lock->l_ast_data = NULL; + + if (lock->l_req_mode != lock->l_granted_mode) + return LDLM_ITER_STOP; + + return LDLM_ITER_CONTINUE; +} + +void llu_clear_inode(struct inode *inode) +{ + struct ll_fid fid; struct llu_inode_info *lli = llu_i2info(inode); - int rc; + struct llu_sb_info *sbi = llu_i2sbi(inode); ENTRY; - CDEBUG(D_INODE, "clear inode: %lu\n", lli->lli_st_ino); - rc = llu_mdc_cancel_unused(&sbi->ll_mdc_conn, lli, - LDLM_FL_NO_CALLBACK); - if (rc < 0) { - CERROR("ll_mdc_cancel_unused: %d\n", rc); - /* XXX FIXME do something dramatic */ - } + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%lu(%p)\n", lli->lli_st_ino, + lli->lli_st_generation, inode); - if (lli->lli_smd) { - rc = obd_cancel_unused(&sbi->ll_osc_conn, lli->lli_smd, 0); - if (rc < 0) { - CERROR("obd_cancel_unused: %d\n", rc); - /* XXX FIXME do something dramatic */ - } - } + ll_inode2fid(&fid, inode); + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(lli->lli_flags)); + mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); if (lli->lli_smd) - obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd); + obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd, + null_if_equal, inode); + + if (lli->lli_smd) { + obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd); + lli->lli_smd = NULL; + } if (lli->lli_symlink_name) { OBD_FREE(lli->lli_symlink_name, @@ -405,63 +612,214 @@ static void llu_clear_inode(struct inode *inode) void llu_iop_gone(struct inode *inode) { struct llu_inode_info *lli = llu_i2info(inode); + ENTRY; llu_clear_inode(inode); OBD_FREE(lli, sizeof(*lli)); + EXIT; } -static int llu_setattr_raw(struct inode *inode, struct iattr *attr) +static int inode_setattr(struct inode * inode, struct iattr * attr) { - struct ptlrpc_request *request = NULL; + unsigned int ia_valid = attr->ia_valid; + struct llu_inode_info *lli = llu_i2info(inode); + int error = 0; + + if (ia_valid & ATTR_SIZE) { + error = llu_vmtruncate(inode, attr->ia_size); + if (error) + goto out; + } + + if (ia_valid & ATTR_UID) + lli->lli_st_uid = attr->ia_uid; + if (ia_valid & ATTR_GID) + lli->lli_st_gid = attr->ia_gid; + if (ia_valid & ATTR_ATIME) + lli->lli_st_atime = attr->ia_atime; + if (ia_valid & ATTR_MTIME) + lli->lli_st_mtime = attr->ia_mtime; + if (ia_valid & ATTR_CTIME) + lli->lli_st_ctime = attr->ia_ctime; + if (ia_valid & ATTR_MODE) { + lli->lli_st_mode = attr->ia_mode; + if (!in_group_p(lli->lli_st_gid) && !capable(CAP_FSETID)) + lli->lli_st_mode &= ~S_ISGID; + } + /* mark_inode_dirty(inode); */ +out: + return error; +} + +/* If this inode has objects allocated to it (lsm != NULL), then the OST + * object(s) determine the file size and mtime. Otherwise, the MDS will + * keep these values until such a time that objects are allocated for it. + * We do the MDS operations first, as it is checking permissions for us. + * We don't to the MDS RPC if there is nothing that we want to store there, + * otherwise there is no harm in updating mtime/atime on the MDS if we are + * going to do an RPC anyways. + * + * If we are doing a truncate, we will send the mtime and ctime updates + * to the OST with the punch RPC, otherwise we do an explicit setattr RPC. + * I don't believe it is possible to get e.g. ATTR_MTIME_SET and ATTR_SIZE + * at the same time. + */ +#define OST_ATTR (ATTR_MTIME | ATTR_MTIME_SET | ATTR_CTIME | \ + ATTR_ATIME | ATTR_ATIME_SET | ATTR_SIZE) +int llu_setattr_raw(struct inode *inode, struct iattr *attr) +{ + struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd; struct llu_sb_info *sbi = llu_i2sbi(inode); struct llu_inode_info *lli = llu_i2info(inode); + struct ptlrpc_request *request = NULL; struct mdc_op_data op_data; - int err = 0; + int ia_valid = attr->ia_valid; + int rc = 0; ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", lli->lli_st_ino); - /* if need truncate, do it at first */ - if (attr->ia_valid & ATTR_SIZE) { - printf("************* don't support truncate now !!!!!!!!\n"); - LBUG(); + if (ia_valid & ATTR_SIZE) { + if (attr->ia_size > ll_file_maxbytes(inode)) { + CDEBUG(D_INODE, "file too large %llu > "LPU64"\n", + attr->ia_size, ll_file_maxbytes(inode)); + RETURN(-EFBIG); + } + + attr->ia_valid |= ATTR_MTIME | ATTR_CTIME; } - /* Don't send size changes to MDS to avoid "fast EA" problems, and - * also avoid a pointless RPC (we get file size from OST anyways). - */ - attr->ia_valid &= ~ATTR_SIZE; - if (!attr->ia_valid) - RETURN(0); + /* We mark all of the fields "set" so MDS/OST does not re-set them */ + if (attr->ia_valid & ATTR_CTIME) { + attr->ia_ctime = CURRENT_TIME; + attr->ia_valid |= ATTR_CTIME_SET; + } + if (!(ia_valid & ATTR_ATIME_SET) && (attr->ia_valid & ATTR_ATIME)) { + attr->ia_atime = CURRENT_TIME; + attr->ia_valid |= ATTR_ATIME_SET; + } + if (!(ia_valid & ATTR_MTIME_SET) && (attr->ia_valid & ATTR_MTIME)) { + attr->ia_mtime = CURRENT_TIME; + attr->ia_valid |= ATTR_MTIME_SET; + } - llu_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); + if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) + CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n", + LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime), + LTIME_S(CURRENT_TIME)); + if (lsm) + attr->ia_valid &= ~ATTR_SIZE; + + /* If only OST attributes being set on objects, don't do MDS RPC. + * In that case, we need to check permissions and update the local + * inode ourselves so we can call obdo_from_inode() always. */ + if (ia_valid & (lsm ? ~(OST_ATTR | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { + struct lustre_md md; + llu_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); + + rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, + attr, NULL, 0, NULL, 0, &request); + + if (rc) { + ptlrpc_req_finished(request); + if (rc != -EPERM && rc != -EACCES) + CERROR("mdc_setattr fails: rc = %d\n", rc); + RETURN(rc); + } - err = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, &request); - if (err) - CERROR("mdc_setattr fails: err = %d\n", err); + rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + llu_update_inode(inode, md.body, md.lsm); + ptlrpc_req_finished(request); - ptlrpc_req_finished(request); + if (!md.lsm || !S_ISREG(lli->lli_st_mode)) { + CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); + RETURN(0); + } + } else { + /* The OST doesn't check permissions, but the alternative is + * a gratuitous RPC to the MDS. We already rely on the client + * to do read/write/truncate permission checks, so is mtime OK? + */ + if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) { + /* from sys_utime() */ + if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { + if (current->fsuid != lli->lli_st_uid && + (rc = ll_permission(inode, 0/*MAY_WRITE*/, NULL)) != 0) + RETURN(rc); + } else { + /* from inode_change_ok() */ + if (current->fsuid != lli->lli_st_uid && + !capable(CAP_FOWNER)) + RETURN(-EPERM); + } + } + + /* Won't invoke vmtruncate, as we already cleared ATTR_SIZE */ + inode_setattr(inode, attr); + } - if (S_ISREG(inode->i_mode) && attr->ia_valid & ATTR_MTIME_SET) { - struct lov_stripe_md *lsm = lli->lli_smd; + if (ia_valid & ATTR_SIZE) { + struct ldlm_extent extent = { .start = attr->ia_size, + .end = OBD_OBJECT_EOF }; + struct lustre_handle lockh = { 0 }; + int err, ast_flags = 0; + /* XXX when we fix the AST intents to pass the discard-range + * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA + * XXX here. */ + + /* Writeback uses inode->i_size to determine how far out + * its cached pages go. ll_truncate gets a PW lock, canceling + * our lock, _after_ it has updated i_size. this can confuse + * + * We really need to get our PW lock before we change + * inode->i_size. If we don't we can race with other + * i_size updaters on our node, like ll_file_read. We + * can also race with i_size propogation to other + * nodes through dirtying and writeback of final cached + * pages. This last one is especially bad for racing + * o_append users on other nodes. */ + if (extent.start == 0) + ast_flags = LDLM_AST_DISCARD_DATA; + rc = llu_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, + &extent, &lockh, ast_flags); + if (rc != ELDLM_OK) { + if (rc > 0) + RETURN(-ENOLCK); + RETURN(rc); + } + + rc = llu_vmtruncate(inode, attr->ia_size); + if (rc == 0) + set_bit(LLI_F_HAVE_OST_SIZE_LOCK, + &llu_i2info(inode)->lli_flags); + + /* unlock now as we don't mind others file lockers racing with + * the mds updates below? */ + err = llu_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); + if (err) { + CERROR("llu_extent_unlock failed: %d\n", err); + if (!rc) + rc = err; + } + } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { struct obdo oa; - int err2; CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - lli->lli_st_ino, attr->ia_mtime); + lli->lli_st_ino, LTIME_S(attr->ia_mtime)); oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMTIME; - oa.o_mtime = attr->ia_mtime; - err2 = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); - if (err2) { - CERROR("obd_setattr fails: rc=%d\n", err); - if (!err) - err = err2; - } + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL); + if (rc) + CERROR("obd_setattr fails: rc=%d\n", rc); } - RETURN(err); + RETURN(rc); } /* FIXME here we simply act as a thin layer to glue it with @@ -473,6 +831,7 @@ static int llu_iop_setattr(struct pnode *pno, struct intnl_stat *stbuf) { struct iattr iattr; + ENTRY; memset(&iattr, 0, sizeof(iattr)); @@ -502,97 +861,519 @@ static int llu_iop_setattr(struct pnode *pno, } iattr.ia_valid |= ATTR_RAW; - /* FIXME FIXME FIXME FIXME FIXME FIXME FIXME - * without ATTR_FROM_OPEN, mds_reint_setattr will call - * mds_fid2locked_dentry() and deadlocked at completion_ast call. - * Here we workaround it and avoid any locking. - * FIXME FIXME FIXME FIXME FIXME FIXME FIXME - */ - iattr.ia_valid |= ATTR_FROM_OPEN; - return llu_setattr_raw(ino, &iattr); + RETURN(llu_setattr_raw(ino, &iattr)); } +#define EXT2_LINK_MAX 32000 -static int llu_mkdir2(struct inode *dir, const char *name, int len, int mode) +static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt) { + struct inode *dir = pno->p_base->pb_parent->pb_ino; + struct qstr *qstr = &pno->p_base->pb_name; + const char *name = qstr->name; + int len = qstr->len; struct ptlrpc_request *request = NULL; struct llu_sb_info *sbi = llu_i2sbi(dir); - struct llu_inode_info *lli = llu_i2info(dir); struct mdc_op_data op_data; int err = -EMLINK; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu\n", - name, lli->lli_st_ino); - /* FIXME check this later */ -#if 0 - if (dir->i_nlink >= EXT2_LINK_MAX) + if (llu_i2info(dir)->lli_st_nlink >= EXT2_LINK_MAX) RETURN(err); - mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR; -#endif - mode |= S_IFDIR; + llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); - err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode, + err = mdc_create(sbi->ll_mdc_exp, &op_data, + tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO, current->fsuid, current->fsgid, 0, &request); ptlrpc_req_finished(request); RETURN(err); } -static int llu_iop_mkdir(struct pnode *pno, mode_t mode) +static int llu_readlink_internal(struct inode *inode, + struct ptlrpc_request **request, + char **symname) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct ll_fid fid; + struct mds_body *body; + int rc, symlen = lli->lli_st_size + 1; + ENTRY; + + *request = NULL; + + if (lli->lli_symlink_name) { + *symname = lli->lli_symlink_name; + CDEBUG(D_INODE, "using cached symlink %s\n", *symname); + RETURN(0); + } + + ll_inode2fid(&fid, inode); + rc = mdc_getattr(sbi->ll_mdc_exp, &fid, + OBD_MD_LINKNAME, symlen, request); + if (rc) { + CERROR("inode %lu: rc = %d\n", lli->lli_st_ino, rc); + RETURN(rc); + } + + body = lustre_msg_buf ((*request)->rq_repmsg, 0, sizeof (*body)); + LASSERT (body != NULL); + LASSERT_REPSWABBED (*request, 0); + + if ((body->valid & OBD_MD_LINKNAME) == 0) { + CERROR ("OBD_MD_LINKNAME not set on reply\n"); + GOTO (failed, rc = -EPROTO); + } + + LASSERT (symlen != 0); + if (body->eadatasize != symlen) { + CERROR ("inode %lu: symlink length %d not expected %d\n", + lli->lli_st_ino, body->eadatasize - 1, symlen - 1); + GOTO (failed, rc = -EPROTO); + } + + *symname = lustre_msg_buf ((*request)->rq_repmsg, 1, symlen); + if (*symname == NULL || + strnlen (*symname, symlen) != symlen - 1) { + /* not full/NULL terminated */ + CERROR ("inode %lu: symlink not NULL terminated string" + "of length %d\n", lli->lli_st_ino, symlen - 1); + GOTO (failed, rc = -EPROTO); + } + + OBD_ALLOC(lli->lli_symlink_name, symlen); + /* do not return an error if we cannot cache the symlink locally */ + if (lli->lli_symlink_name) + memcpy(lli->lli_symlink_name, *symname, symlen); + + RETURN(0); + + failed: + ptlrpc_req_finished (*request); + RETURN (-EPROTO); +} + +static int llu_iop_readlink(struct pnode *pno, char *data, size_t bufsize) +{ + struct inode *inode = pno->p_base->pb_ino; + struct ptlrpc_request *request; + char *symname; + int rc; + ENTRY; + + /* on symlinks lli_open_sem protects lli_symlink_name allocation/data */ +/* + down(&lli->lli_open_sem); +*/ + rc = llu_readlink_internal(inode, &request, &symname); + if (rc) + GOTO(out, rc); + + LASSERT(symname); + strncpy(data, symname, bufsize); + + ptlrpc_req_finished(request); + out: +/* + up(&lli->lli_open_sem); +*/ + RETURN(rc); +} + +static int llu_iop_mknod_raw(struct pnode *pno, + mode_t mode, + dev_t dev) +{ + struct ptlrpc_request *request = NULL; + struct inode *dir = pno->p_parent->p_base->pb_ino; + struct llu_sb_info *sbi = llu_i2sbi(dir); + struct mdc_op_data op_data; + int err = -EMLINK; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu\n", + pno->p_base->pb_name.name, llu_i2info(dir)->lli_st_ino); + + if (llu_i2info(dir)->lli_st_nlink >= EXT2_LINK_MAX) + RETURN(err); + + mode &= ~current->fs->umask; + + switch (mode & S_IFMT) { + case 0: + case S_IFREG: + mode |= S_IFREG; /* for mode = 0 case, fallthrough */ + case S_IFCHR: + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: + llu_prepare_mdc_op_data(&op_data, dir, NULL, + pno->p_base->pb_name.name, + pno->p_base->pb_name.len, + 0); + err = mdc_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode, + current->fsuid, current->fsgid, dev, &request); + ptlrpc_req_finished(request); + break; + case S_IFDIR: + err = -EPERM; + break; + default: + err = -EINVAL; + } + RETURN(err); +} + +#if 0 +static int llu_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode, + const char *name, int len) +{ + struct ptlrpc_request *request = NULL; + struct mds_body *body; + struct lov_mds_md *eadata; + struct lov_stripe_md *lsm = NULL; + struct obd_trans_info oti = { 0 }; + struct mdc_op_data op_data; + struct obdo *oa; + int rc; + ENTRY; + + llu_prepare_mdc_op_data(&op_data, dir, child, name, len, mode); + rc = mdc_unlink(&llu_i2sbi(dir)->ll_mdc_conn, &op_data, &request); + if (rc) + GOTO(out, rc); + /* req is swabbed so this is safe */ + body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); + + if (!(body->valid & OBD_MD_FLEASIZE)) + GOTO(out, rc = 0); + + if (body->eadatasize == 0) { + CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n"); + GOTO(out, rc = -EPROTO); + } + + /* The MDS sent back the EA because we unlinked the last reference + * to this file. Use this EA to unlink the objects on the OST. + * It's opaque so we don't swab here; we leave it to obd_unpackmd() to + * check it is complete and sensible. */ + eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL); + LASSERT(eadata != NULL); + if (eadata == NULL) { + CERROR("Can't unpack MDS EA data\n"); + GOTO(out, rc = -EPROTO); + } + + rc = obd_unpackmd(llu_i2obdconn(dir), &lsm, eadata, body->eadatasize); + if (rc < 0) { + CERROR("obd_unpackmd: %d\n", rc); + GOTO(out, rc); + } + LASSERT(rc >= sizeof(*lsm)); + + oa = obdo_alloc(); + if (oa == NULL) + GOTO(out_free_memmd, rc = -ENOMEM); + + oa->o_id = lsm->lsm_object_id; + oa->o_mode = body->mode & S_IFMT; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; + + if (body->valid & OBD_MD_FLCOOKIE) { + oa->o_valid |= OBD_MD_FLCOOKIE; + oti.oti_logcookies = lustre_msg_buf(request->rq_repmsg, 3, + body->eadatasize); + } + + rc = obd_destroy(llu_i2obdconn(dir), oa, lsm, &oti); + obdo_free(oa); + if (rc) + CERROR("obd destroy objid 0x"LPX64" error %d\n", + lsm->lsm_object_id, rc); + out_free_memmd: + obd_free_memmd(llu_i2obdconn(dir), &lsm); + out: + ptlrpc_req_finished(request); + return rc; +} +#endif + +static int llu_iop_link_raw(struct pnode *old, struct pnode *new) +{ + struct inode *src = old->p_base->pb_ino; + struct inode *dir = new->p_parent->p_base->pb_ino; + const char *name = new->p_base->pb_name.name; + int namelen = new->p_base->pb_name.len; + struct ptlrpc_request *request = NULL; + struct mdc_op_data op_data; + int rc; + ENTRY; + + LASSERT(src); + LASSERT(dir); + + llu_prepare_mdc_op_data(&op_data, src, dir, name, namelen, 0); + rc = mdc_link(llu_i2sbi(src)->ll_mdc_exp, &op_data, &request); + ptlrpc_req_finished(request); + + RETURN(rc); +} + +static int llu_iop_unlink_raw(struct pnode *pno) { struct inode *dir = pno->p_base->pb_parent->pb_ino; struct qstr *qstr = &pno->p_base->pb_name; + const char *name = qstr->name; + int len = qstr->len; + struct inode *target = pno->p_base->pb_ino; + struct ptlrpc_request *request = NULL; + struct mdc_op_data op_data; int rc; + ENTRY; - LASSERT(dir); + LASSERT(target); - rc = llu_mkdir2(dir, qstr->name, qstr->len, mode); + llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); + rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request); + if (!rc) { + rc = llu_objects_destroy(request, dir); - return rc; + llu_i2info(target)->lli_stale_flag = 1; + unhook_stale_inode(pno); + } + + ptlrpc_req_finished(request); + RETURN(rc); } -#ifndef S_IRWXUGO -#define S_IRWXUGO (S_IRWXU|S_IRWXG|S_IRWXO) +/* FIXME + * following cases need to be considered later: + * - rename an opened file/dir + * - an opened file be removed in rename + * - rename to remove and hardlink (?opened) + */ +static int llu_iop_rename_raw(struct pnode *old, struct pnode *new) +{ + struct inode *src = old->p_parent->p_base->pb_ino; + struct inode *tgt = new->p_parent->p_base->pb_ino; + struct inode *tgtinode = new->p_base->pb_ino; + const char *oldname = old->p_base->pb_name.name; + int oldnamelen = old->p_base->pb_name.len; + const char *newname = new->p_base->pb_name.name; + int newnamelen = new->p_base->pb_name.len; + struct ptlrpc_request *request = NULL; + struct mdc_op_data op_data; + int rc; + ENTRY; + + LASSERT(src); + LASSERT(tgt); + + llu_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0); + rc = mdc_rename(llu_i2sbi(src)->ll_mdc_exp, &op_data, + oldname, oldnamelen, newname, newnamelen, + &request); + if (!rc) { + rc = llu_objects_destroy(request, src); + + if (tgtinode) { + llu_i2info(tgtinode)->lli_stale_flag = 1; + unhook_stale_inode(new); + } + } + + ptlrpc_req_finished(request); + + RETURN(rc); +} + +#if 0 +static int llu_statfs_internal(struct llu_sb_info *sbi, + struct obd_statfs *osfs, + unsigned long max_age) +{ + struct obd_statfs obd_osfs; + int rc; + ENTRY; + + rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age); + if (rc) { + CERROR("mdc_statfs fails: rc = %d\n", rc); + RETURN(rc); + } + + CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); + + rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age); + if (rc) { + CERROR("obd_statfs fails: rc = %d\n", rc); + RETURN(rc); + } + + CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree, + obd_osfs.os_files); + + osfs->os_blocks = obd_osfs.os_blocks; + osfs->os_bfree = obd_osfs.os_bfree; + osfs->os_bavail = obd_osfs.os_bavail; + + /* If we don't have as many objects free on the OST as inodes + * on the MDS, we reduce the total number of inodes to + * compensate, so that the "inodes in use" number is correct. + */ + if (obd_osfs.os_ffree < osfs->os_ffree) { + osfs->os_files = (osfs->os_files - osfs->os_ffree) + + obd_osfs.os_ffree; + osfs->os_ffree = obd_osfs.os_ffree; + } + + RETURN(rc); +} + +static int llu_statfs(struct llu_sb_info *sbi, struct kstatfs *sfs) +{ + struct obd_statfs osfs; + int rc; + + CDEBUG(D_VFSTRACE, "VFS Op:\n"); + + /* For now we will always get up-to-date statfs values, but in the + * future we may allow some amount of caching on the client (e.g. + * from QOS or lprocfs updates). */ + rc = llu_statfs_internal(sbi, &osfs, jiffies - 1); + if (rc) + return rc; + + statfs_unpack(sfs, &osfs); + + if (sizeof(sfs->f_blocks) == 4) { + while (osfs.os_blocks > ~0UL) { + sfs->f_bsize <<= 1; + + osfs.os_blocks >>= 1; + osfs.os_bfree >>= 1; + osfs.os_bavail >>= 1; + } + } + + sfs->f_blocks = osfs.os_blocks; + sfs->f_bfree = osfs.os_bfree; + sfs->f_bavail = osfs.os_bavail; + + return 0; +} + +static int llu_iop_statvfs(struct pnode *pno, + struct inode *ino, + struct intnl_statvfs *buf) +{ + struct statfs fs; + int rc; + ENTRY; + +#ifndef __CYGWIN__ + LASSERT(pno->p_base->pb_ino); + rc = llu_statfs(llu_i2sbi(pno->p_base->pb_ino), &fs); + if (rc) + RETURN(rc); + + /* from native driver */ + buf->f_bsize = fs.f_bsize; /* file system block size */ + buf->f_frsize = fs.f_bsize; /* file system fundamental block size */ + buf->f_blocks = fs.f_blocks; + buf->f_bfree = fs.f_bfree; + buf->f_bavail = fs.f_bavail; + buf->f_files = fs.f_files; /* Total number serial numbers */ + buf->f_ffree = fs.f_ffree; /* Number free serial numbers */ + buf->f_favail = fs.f_ffree; /* Number free ser num for non-privileged*/ + buf->f_fsid = fs.f_fsid.__val[1]; + buf->f_flag = 0; /* No equiv in statfs; maybe use type? */ + buf->f_namemax = fs.f_namelen; +#endif + + RETURN(0); +} #endif -static int llu_symlink2(struct inode *dir, const char *name, int len, - const char *tgt) +static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode) { + struct inode *dir = pno->p_base->pb_parent->pb_ino; + struct qstr *qstr = &pno->p_base->pb_name; + const char *name = qstr->name; + int len = qstr->len; struct ptlrpc_request *request = NULL; - time_t curtime = CURRENT_TIME; - struct llu_sb_info *sbi = llu_i2sbi(dir); struct llu_inode_info *lli = llu_i2info(dir); struct mdc_op_data op_data; int err = -EMLINK; ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%lu(%p)\n", + name, lli->lli_st_ino, lli->lli_st_generation, dir); - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu,target=%s\n", - name, lli->lli_st_ino, tgt); - -#if 0 - if (dir->i_nlink >= EXT2_LINK_MAX) + if (lli->lli_st_nlink >= EXT2_LINK_MAX) RETURN(err); -#endif + + mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR; llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); - err = mdc_create(&sbi->ll_mdc_conn, &op_data, - tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO, + err = mdc_create(llu_i2sbi(dir)->ll_mdc_exp, &op_data, NULL, 0, mode, current->fsuid, current->fsgid, 0, &request); ptlrpc_req_finished(request); RETURN(err); } -static int llu_iop_symlink(struct pnode *pno, const char *data) +static int llu_iop_rmdir_raw(struct pnode *pno) { struct inode *dir = pno->p_base->pb_parent->pb_ino; struct qstr *qstr = &pno->p_base->pb_name; + const char *name = qstr->name; + int len = qstr->len; + struct ptlrpc_request *request = NULL; + struct mdc_op_data op_data; + struct llu_inode_info *lli = llu_i2info(dir); int rc; - - LASSERT(dir); + ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%lu(%p)\n", + name, lli->lli_st_ino, lli->lli_st_generation, dir); - rc = llu_symlink2(dir, qstr->name, qstr->len, data); + llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, S_IFDIR); + rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request); + ptlrpc_req_finished(request); - return rc; + /* libsysio: remove the pnode right away */ + if (!rc) { + llu_i2info(pno->p_base->pb_ino)->lli_stale_flag = 1; + unhook_stale_inode(pno); + } + + RETURN(rc); +} + +static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap) +{ + CERROR("liblustre did not support fcntl\n"); + return -ENOSYS; +} + +static int llu_iop_ioctl(struct inode *ino, unsigned long int request, + va_list ap) +{ + CERROR("liblustre did not support ioctl\n"); + return -ENOSYS; +} + +/* + * we already do syncronous read/write + */ +static int llu_iop_sync(struct inode *inode) +{ + return 0; +} + +static int llu_iop_datasync(struct inode *inode) +{ + return 0; } struct filesys_ops llu_filesys_ops = @@ -600,31 +1381,40 @@ struct filesys_ops llu_filesys_ops = fsop_gone: llu_fsop_gone, }; +struct inode *llu_iget(struct filesys *fs, struct lustre_md *md) +{ + struct inode *inode; + struct ll_fid fid; + struct file_identifier fileid = {&fid, sizeof(fid)}; + + if ((md->body->valid & + (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) != + (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) + CERROR("invalide fields!\n"); + + /* try to find existing inode */ + fid.id = md->body->ino; + fid.generation = md->body->generation; + fid.f_type = md->body->mode & S_IFMT; + + inode = _sysio_i_find(fs, &fileid); + if (inode) { + if (llu_i2info(inode)->lli_st_generation == + md->body->generation) { + llu_update_inode(inode, md->body, md->lsm); + return inode; + } else + I_RELE(inode); + } -static struct inode_ops llu_inode_ops = { - inop_lookup: llu_iop_lookup, - inop_getattr: llu_iop_getattr, - inop_setattr: llu_iop_setattr, - inop_getdirentries: NULL, - inop_mkdir: llu_iop_mkdir, - inop_rmdir: NULL, - inop_symlink: llu_iop_symlink, - inop_readlink: NULL, - inop_open: llu_iop_open, - inop_close: llu_iop_close, - inop_unlink: NULL, - inop_ipreadv: llu_iop_ipreadv, - inop_ipwritev: llu_iop_ipwritev, - inop_iodone: llu_iop_iodone, - inop_fcntl: NULL, - inop_sync: NULL, - inop_datasync: NULL, - inop_ioctl: NULL, - inop_mknod: NULL, - inop_statvfs: NULL, - inop_gone: llu_iop_gone, -}; + inode = llu_new_inode(fs, &fid); + if (inode) + llu_update_inode(inode, md->body, md->lsm); + + return inode; +} +extern struct list_head lustre_profile_list; static int llu_fsswop_mount(const char *source, @@ -636,22 +1426,23 @@ llu_fsswop_mount(const char *source, struct filesys *fs; struct inode *root; struct pnode_base *rootpb; - static struct qstr noname = { NULL, 0, 0 }; + struct obd_device *obd; struct ll_fid rootfid; - struct llu_sb_info *sbi; - struct ptlrpc_connection *mdc_conn; + struct obd_statfs osfs; + static struct qstr noname = { NULL, 0, 0 }; struct ptlrpc_request *request = NULL; - struct mds_body *root_body; - struct obd_uuid param_uuid; + struct lustre_handle mdc_conn = {0, }; + struct lustre_handle osc_conn = {0, }; + struct lustre_md md; class_uuid_t uuid; - struct obd_device *obd; - char *osc=mount_option.osc_uuid; - char *mdc=mount_option.mdc_uuid; + struct lustre_profile *lprof; + char *osc = NULL, *mdc = NULL; int err = -EINVAL; ENTRY; + /* allocate & initialize sbi */ OBD_ALLOC(sbi, sizeof(*sbi)); if (!sbi) RETURN(-ENOMEM); @@ -660,6 +1451,72 @@ llu_fsswop_mount(const char *source, generate_random_uuid(uuid); class_uuid_unparse(uuid, &sbi->ll_sb_uuid); + /* zeroconf */ + if (g_zconf) { + struct config_llog_instance cfg; + int len; + + if (!g_zconf_mdsname) { + CERROR("no mds name\n"); + GOTO(out_free, err = -EINVAL); + } + + /* XXX */ + /* generate a string unique to this super, let's try + the address of the super itself.*/ + len = (sizeof(sbi) * 2) + 1; + OBD_ALLOC(sbi->ll_instance, len); + if (sbi->ll_instance == NULL) + GOTO(out_free, err = -ENOMEM); + sprintf(sbi->ll_instance, "%p", sbi); + + cfg.cfg_instance = sbi->ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + err = liblustre_process_log(&cfg); + if (err < 0) { + CERROR("Unable to process log: %s\n", g_zconf_profile); + + GOTO(out_free, err); + } + + lprof = class_get_profile(g_zconf_profile); + if (lprof == NULL) { + CERROR("No profile found: %s\n", g_zconf_profile); + GOTO(out_free, err = -EINVAL); + } + if (osc) + OBD_FREE(osc, strlen(osc) + 1); + OBD_ALLOC(osc, strlen(lprof->lp_osc) + + strlen(sbi->ll_instance) + 2); + sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance); + + if (mdc) + OBD_FREE(mdc, strlen(mdc) + 1); + OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + + strlen(sbi->ll_instance) + 2); + sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance); + } else { + /* setup from dump_file */ + if (list_empty(&lustre_profile_list)) { + CERROR("no profile\n"); + GOTO(out_free, err = -EINVAL); + } + + lprof = list_entry(lustre_profile_list.next, + struct lustre_profile, lp_list); + osc = lprof->lp_osc; + mdc = lprof->lp_mdc; + } + + if (!osc) { + CERROR("no osc\n"); + GOTO(out_free, err = -EINVAL); + } + if (!mdc) { + CERROR("no mdc\n"); + GOTO(out_free, err = -EINVAL); + } + fs = _sysio_fs_new(&llu_filesys_ops, flags, sbi); if (!fs) { err = -ENOMEM; @@ -669,65 +1526,72 @@ llu_fsswop_mount(const char *source, obd = class_name2obd(mdc); if (!obd) { CERROR("MDC %s: not setup or attached\n", mdc); - err = -EINVAL; - goto out_free; + GOTO(out_free, err = -EINVAL); } + if (mdc_init_ea_size(obd, osc)) + GOTO(out_free, err = -EINVAL); + /* setup mdc */ - /* FIXME need recover stuff */ - err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", mdc, err); - goto out_free; + GOTO(out_free, err); } + sbi->ll_mdc_exp = class_conn2export(&mdc_conn); - mdc_conn = sbi2mdc(sbi)->cl_import->imp_connection; + err = obd_statfs(obd, &osfs, 100000000); + if (err) + GOTO(out_mdc, err); + + /* + * FIXME fill fs stat data into sbi here!!! FIXME + */ /* setup osc */ obd = class_name2obd(osc); if (!obd) { CERROR("OSC %s: not setup or attached\n", osc); - err = -EINVAL; - goto out_mdc; + GOTO(out_mdc, err = -EINVAL); } - err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", osc, err); - goto out_mdc; + GOTO(out_mdc, err); } + sbi->ll_osc_exp = class_conn2export(&osc_conn); - err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - goto out_osc; + GOTO(out_osc, err); } CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id); sbi->ll_rootino = rootfid.id; -/* XXX do we need this?? - memset(&osfs, 0, sizeof(osfs)); - rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn),&osfs,jiffies-100*HZ); -*/ /* fetch attr of root inode */ - err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid, + err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); if (err) { CERROR("mdc_getattr failed for root: rc = %d\n", err); - goto out_request; + GOTO(out_osc, err); + } + + err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); + if (err) { + CERROR("failed to understand root inode md: rc = %d\n",err); + GOTO(out_request, err); } - root_body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*root_body)); LASSERT(sbi->ll_rootino != 0); - root = llu_new_inode(fs, root_body->ino, root_body->mode); - if (!root) { - err = -ENOMEM; - goto out_request; + root = llu_iget(fs, &md); + if (root == NULL) { + CERROR("fail to generate root inode\n"); + GOTO(out_request, err = -EBADF); } - llu_update_inode(root, root_body, NULL); - /* * Generate base path-node for root. */ @@ -737,18 +1601,15 @@ llu_fsswop_mount(const char *source, goto out_inode; } - err = _sysio_do_mount(fs, rootpb, flags, NULL, mntp); + err = _sysio_do_mount(fs, rootpb, flags, tocover, mntp); if (err) { _sysio_pb_gone(rootpb); goto out_inode; } ptlrpc_req_finished(request); - request = NULL; - printf("************************************************\n"); - printf("* Mount successfully!!!!!!! *\n"); - printf("************************************************\n"); + printf("LibLustre: namespace mounted successfully!\n"); return 0; @@ -757,9 +1618,9 @@ out_inode: out_request: ptlrpc_req_finished(request); out_osc: - obd_disconnect(&sbi->ll_osc_conn, 0); + obd_disconnect(sbi->ll_osc_exp, 0); out_mdc: - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_mdc_exp, 0); out_free: OBD_FREE(sbi, sizeof(*sbi)); return err; @@ -769,3 +1630,31 @@ struct fssw_ops llu_fssw_ops = { llu_fsswop_mount }; +static struct inode_ops llu_inode_ops = { + inop_lookup: llu_iop_lookup, + inop_getattr: llu_iop_getattr, + inop_setattr: llu_iop_setattr, + inop_getdirentries: NULL, + inop_mkdir: llu_iop_mkdir_raw, + inop_rmdir: llu_iop_rmdir_raw, + inop_symlink: llu_iop_symlink_raw, + inop_readlink: llu_iop_readlink, + inop_open: llu_iop_open, + inop_close: llu_iop_close, + inop_link: llu_iop_link_raw, + inop_unlink: llu_iop_unlink_raw, + inop_rename: llu_iop_rename_raw, + inop_ipreadv: llu_iop_ipreadv, + inop_ipwritev: llu_iop_ipwritev, + inop_iodone: llu_iop_iodone, + inop_fcntl: llu_iop_fcntl, + inop_sync: llu_iop_sync, + inop_datasync: llu_iop_datasync, + inop_ioctl: llu_iop_ioctl, + inop_mknod: llu_iop_mknod_raw, +#if 0 + inop_statvfs: llu_iop_statvfs, +#endif + inop_gone: llu_iop_gone, +}; + -- 1.8.3.1