X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosd-ldiskfs%2Fosd_handler.c;h=9dbef7cf6039a32fed7d2e2f96b98b4bff6c2bda;hb=dac584c6946d15e1ca9e6feeb26b164768041c40;hp=58a1e1a9239b9ec78446bcb8e5e020659cbfb225;hpb=4d408c9aed9adaf1f4e2ea87851728a1cf662594;p=fs%2Flustre-release.git diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 58a1e1a..9dbef7c 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -44,6 +44,10 @@ #define DEBUG_SUBSYSTEM S_OSD #include +#include +#ifdef HAVE_UIDGID_HEADER +# include +#endif /* LUSTRE_VERSION_CODE */ #include @@ -54,6 +58,9 @@ /* XATTR_{REPLACE,CREATE} */ #include +#include +#include +#undef ENTRY /* * struct OBD_{ALLOC,FREE}*() * OBD_FAIL_CHECK @@ -72,7 +79,6 @@ #include #include -#include #include int ldiskfs_pdo = 1; @@ -127,7 +133,6 @@ int osd_trans_declare_op2rb[] = { [OSD_OT_WRITE] = OSD_OT_WRITE, [OSD_OT_INSERT] = OSD_OT_DELETE, [OSD_OT_DELETE] = OSD_OT_INSERT, - [OSD_OT_UPDATE] = OSD_OT_MAX, [OSD_OT_QUOTA] = OSD_OT_MAX, }; @@ -296,133 +301,6 @@ osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev, return inode; } -static struct inode *osd_iget_check(struct osd_thread_info *info, - struct osd_device *dev, - const struct lu_fid *fid, - struct osd_inode_id *id, - bool in_oi) -{ - struct inode *inode; - int rc = 0; - ENTRY; - - inode = ldiskfs_iget(osd_sb(dev), id->oii_ino); - if (IS_ERR(inode)) { - rc = PTR_ERR(inode); - if (!in_oi || (rc != -ENOENT && rc != -ESTALE)) { - CDEBUG(D_INODE, "no inode: ino = %u, rc = %d\n", - id->oii_ino, rc); - - GOTO(put, rc); - } - - goto check_oi; - } - - if (is_bad_inode(inode)) { - rc = -ENOENT; - if (!in_oi) { - CDEBUG(D_INODE, "bad inode: ino = %u\n", id->oii_ino); - - GOTO(put, rc); - } - - goto check_oi; - } - - if (id->oii_gen != OSD_OII_NOGEN && - inode->i_generation != id->oii_gen) { - rc = -ESTALE; - if (!in_oi) { - CDEBUG(D_INODE, "unmatched inode: ino = %u, " - "oii_gen = %u, i_generation = %u\n", - id->oii_ino, id->oii_gen, inode->i_generation); - - GOTO(put, rc); - } - - goto check_oi; - } - - if (inode->i_nlink == 0) { - rc = -ENOENT; - if (!in_oi) { - CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino); - - GOTO(put, rc); - } - - goto check_oi; - } - -check_oi: - if (rc != 0) { - struct osd_inode_id saved_id = *id; - - LASSERTF(rc == -ESTALE || rc == -ENOENT, "rc = %d\n", rc); - - rc = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD); - /* XXX: There are some possible cases: - * 1. rc = 0. - * Backup/restore caused the OI invalid. - * 2. rc = 0. - * Someone unlinked the object but NOT removed - * the OI mapping, such as mount target device - * as ldiskfs, and modify something directly. - * 3. rc = -ENOENT. - * Someone just removed the object between the - * former oi_lookup and the iget. It is normal. - * 4. Other failure cases. - * - * Generally, when the device is mounted, it will - * auto check whether the system is restored from - * file-level backup or not. We trust such detect - * to distinguish the 1st case from the 2nd case. */ - if (rc == 0) { - if (!IS_ERR(inode) && inode->i_generation != 0 && - inode->i_generation == id->oii_gen) { - rc = -ENOENT; - } else { - __u32 level = D_LFSCK; - - rc = -EREMCHG; - if (!thread_is_running(&dev->od_scrub.os_thread)) - level |= D_CONSOLE; - - CDEBUG(level, "%s: the OI mapping for the FID " - DFID" become inconsistent, the given ID " - "%u/%u, the ID in OI mapping %u/%u\n", - osd_name(dev), PFID(fid), - saved_id.oii_ino, saved_id.oii_gen, - id->oii_ino, id->oii_ino); - } - } - } else { - if (id->oii_gen == OSD_OII_NOGEN) - osd_id_gen(id, inode->i_ino, inode->i_generation); - - /* Do not update file c/mtime in ldiskfs. - * NB: we don't have any lock to protect this because we don't - * have reference on osd_object now, but contention with - * another lookup + attr_set can't happen in the tiny window - * between if (...) and set S_NOCMTIME. */ - if (!(inode->i_flags & S_NOCMTIME)) - inode->i_flags |= S_NOCMTIME; - } - - GOTO(put, rc); - -put: - if (rc != 0) { - if (!IS_ERR(inode)) - iput(inode); - - inode = ERR_PTR(rc); - } - - return inode; -} - /** * \retval +v: new filter_fid, does not contain self-fid * \retval 0: filter_fid_old, contains self-fid @@ -523,14 +401,12 @@ static int osd_check_lma(const struct lu_env *env, struct osd_object *obj) lma->lma_incompat & ~LMA_INCOMPAT_SUPP, PFID(rfid), inode->i_ino); rc = -EOPNOTSUPP; - } else if (!(lma->lma_compat & LMAC_NOT_IN_OI)) { + } else { fid = &lma->lma_self_fid; } } if (fid != NULL && unlikely(!lu_fid_eq(rfid, fid))) { - __u32 level = D_LFSCK; - if (fid_is_idif(rfid) && fid_is_idif(fid)) { struct ost_id *oi = &info->oti_ostid; struct lu_fid *fid1 = &info->oti_fid3; @@ -554,13 +430,7 @@ static int osd_check_lma(const struct lu_env *env, struct osd_object *obj) } } - rc = -EREMCHG; - if (!thread_is_running(&osd->od_scrub.os_thread)) - level |= D_CONSOLE; - - CDEBUG(level, "%s: FID "DFID" != self_fid "DFID"\n", - osd_name(osd), PFID(rfid), PFID(fid)); } RETURN(rc); @@ -581,6 +451,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj, int result; int saved = 0; bool in_oi = false; + bool in_cache = false; bool triggered = false; ENTRY; @@ -610,6 +481,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj, if (lu_fid_eq(fid, &oic->oic_fid) && likely(oic->oic_dev == dev)) { id = &oic->oic_lid; + in_cache = true; goto iget; } @@ -624,8 +496,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj, /* Search order: 3. OI files. */ result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD); if (result == -ENOENT) { - if (!fid_is_norm(fid) || - fid_is_on_ost(info, dev, fid, OI_CHECK_FLD) || + if (!(fid_is_norm(fid) || fid_is_igif(fid)) || !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid), sf->sf_oi_bitmap)) GOTO(out, result = 0); @@ -639,91 +510,107 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj, in_oi = true; iget: - inode = osd_iget_check(info, dev, fid, id, in_oi); + inode = osd_iget(info, dev, id); if (IS_ERR(inode)) { result = PTR_ERR(inode); - if (result == -ENOENT || result == -ESTALE) { - if (!in_oi) - fid_zero(&oic->oic_fid); + if (result != -ENOENT && result != -ESTALE) + GOTO(out, result); - GOTO(out, result = -ENOENT); - } else if (result == -EREMCHG) { + if (in_cache) + fid_zero(&oic->oic_fid); -trigger: - if (!in_oi) - fid_zero(&oic->oic_fid); + result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD); + if (result != 0) + GOTO(out, result = (result == -ENOENT ? 0 : result)); - if (unlikely(triggered)) - GOTO(out, result = saved); + /* The OI mapping is there, but the inode is NOT there. + * Two possible cases for that: + * + * 1) Backup/restore caused the OI invalid. + * 2) Someone unlinked the object but NOT removed + * the OI mapping, such as mount target device + * as ldiskfs, and modify something directly. + * + * Generally, when the device is mounted, it will + * auto check whether the system is restored from + * file-level backup or not. We trust such detect + * to distinguish the 1st case from the 2nd case. */ + if (!(scrub->os_file.sf_flags & SF_INCONSISTENT)) + GOTO(out, result = 0); - triggered = true; - if (thread_is_running(&scrub->os_thread)) { +trigger: + if (unlikely(triggered)) + GOTO(out, result = saved); + + triggered = true; + if (thread_is_running(&scrub->os_thread)) { + result = -EINPROGRESS; + } else if (!dev->od_noscrub) { + /* Since we do not know the right OI mapping, we have + * to trigger OI scrub to scan the whole device. */ + result = osd_scrub_start(dev, SS_AUTO_FULL | + SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT); + CDEBUG(D_LFSCK | D_CONSOLE, "%.16s: trigger OI " + "scrub by RPC for "DFID", rc = %d [1]\n", + osd_name(dev), PFID(fid), result); + if (result == 0 || result == -EALREADY) result = -EINPROGRESS; - } else if (!dev->od_noscrub) { - /* Since we do not know the right OI mapping, - * we have to trigger OI scrub to scan the - * whole device. */ - result = osd_scrub_start(dev, SS_AUTO_FULL | - SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT); - CDEBUG(D_LFSCK | D_CONSOLE, "%.16s: trigger OI " - "scrub by RPC for "DFID", rc = %d [1]\n", - osd_name(dev), PFID(fid),result); - if (result == 0 || result == -EALREADY) - result = -EINPROGRESS; - else - result = -EREMCHG; - } - - /* We still have chance to get the valid inode: for the - * object which is referenced by remote name entry, the - * object on the local MDT will be linked under the dir - * of "/REMOTE_PARENT_DIR" with its FID string as name. - * - * We do not know whether the object for the given FID - * is referenced by some remote name entry or not, and - * especially for DNE II, a multiple-linked object may - * have many name entries reside on many MDTs. - * - * To simplify the operation, OSD will not distinguish - * more, just lookup "/REMOTE_PARENT_DIR". Usually, it - * only happened for the RPC from other MDT during the - * OI scrub, or for the client side RPC with FID only, - * such as FID to path, or from old connected client. */ - saved = result; - result = osd_lookup_in_remote_parent(info, dev, - fid, id); - if (result == 0) { - in_oi = false; - goto iget; - } + else + result = -EREMCHG; + } - result = saved; + /* We still have chance to get the valid inode: for the + * object which is referenced by remote name entry, the + * object on the local MDT will be linked under the dir + * of "/REMOTE_PARENT_DIR" with its FID string as name. + * + * We do not know whether the object for the given FID + * is referenced by some remote name entry or not, and + * especially for DNE II, a multiple-linked object may + * have many name entries reside on many MDTs. + * + * To simplify the operation, OSD will not distinguish + * more, just lookup "/REMOTE_PARENT_DIR". Usually, it + * only happened for the RPC from other MDT during the + * OI scrub, or for the client side RPC with FID only, + * such as FID to path, or from old connected client. */ + saved = result; + result = osd_lookup_in_remote_parent(info, dev, fid, id); + if (result == 0) { + in_oi = false; + goto iget; } - GOTO(out, result); - } + GOTO(out, result = saved); + } - obj->oo_inode = inode; - LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + obj->oo_inode = inode; + LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); result = osd_check_lma(env, obj); if (result != 0) { iput(inode); obj->oo_inode = NULL; - if (result == -EREMCHG) { - if (!in_oi) { - result = osd_oi_lookup(info, dev, fid, id, - OI_CHECK_FLD); - if (result != 0) { - fid_zero(&oic->oic_fid); - GOTO(out, result); - } - } + if (result != -EREMCHG) + GOTO(out, result); + + if (in_cache) + fid_zero(&oic->oic_fid); + + result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD); + if (result == 0) goto trigger; - } - GOTO(out, result); + if (result != -ENOENT) + GOTO(out, result); + + if (!in_oi && (fid_is_norm(fid) || fid_is_igif(fid)) && + ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), + sf->sf_oi_bitmap)) + goto trigger; + + GOTO(out, result = 0); } obj->oo_compat_dot_created = 1; @@ -958,7 +845,7 @@ static void osd_trans_commit_cb(struct super_block *sb, lu_context_exit(&th->th_ctx); lu_context_fini(&th->th_ctx); - thandle_put(th); + OBD_FREE_PTR(oh); } static struct thandle *osd_trans_create(const struct lu_env *env, @@ -983,9 +870,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th->th_result = 0; th->th_tags = LCT_TX_HANDLE; oh->ot_credits = 0; - atomic_set(&th->th_refc, 1); - th->th_alloc_size = sizeof(*oh); - oti->oti_dev = osd_dt_dev(d); INIT_LIST_HEAD(&oh->ot_dcb_list); osd_th_alloced(oh); @@ -1132,7 +1016,8 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, struct osd_thandle *oh; struct osd_thread_info *oti = osd_oti_get(env); struct osd_iobuf *iobuf = &oti->oti_iobuf; - struct qsd_instance *qsd = oti->oti_dev->od_quota_slave; + struct osd_device *osd = osd_dt_dev(th->th_dev); + struct qsd_instance *qsd = osd->od_quota_slave; struct lquota_trans *qtrans; ENTRY; @@ -1156,19 +1041,20 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, oti->oti_txns--; rc = dt_txn_hook_stop(env, th); if (rc != 0) - CERROR("Failure in transaction hook: %d\n", rc); + CERROR("%s: failed in transaction hook: rc = %d\n", + osd_name(osd), rc); /* hook functions might modify th_sync */ hdl->h_sync = th->th_sync; - oh->ot_handle = NULL; - OSD_CHECK_SLOW_TH(oh, oti->oti_dev, - rc = ldiskfs_journal_stop(hdl)); - if (rc != 0) - CERROR("Failure to stop transaction: %d\n", rc); - } else { - thandle_put(&oh->ot_super); - } + oh->ot_handle = NULL; + OSD_CHECK_SLOW_TH(oh, osd, rc = ldiskfs_journal_stop(hdl)); + if (rc != 0) + CERROR("%s: failed to stop transaction: rc = %d\n", + osd_name(osd), rc); + } else { + OBD_FREE_PTR(oh); + } /* inform the quota slave device that the transaction is stopping */ qsd_op_end(env, qsd, qtrans); @@ -1184,7 +1070,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, */ wait_event(iobuf->dr_wait, atomic_read(&iobuf->dr_numreqs) == 0); - osd_fini_iobuf(oti->oti_dev, iobuf); + osd_fini_iobuf(osd, iobuf); if (!rc) rc = iobuf->dr_error; @@ -1454,23 +1340,6 @@ static int osd_ro(const struct lu_env *env, struct dt_device *d) RETURN(rc); } -/* - * Concurrency: serialization provided by callers. - */ -static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d, - int mode, unsigned long timeout, __u32 alg, - struct lustre_capa_key *keys) -{ - struct osd_device *dev = osd_dt_dev(d); - ENTRY; - - dev->od_fl_capa = mode; - dev->od_capa_timeout = timeout; - dev->od_capa_alg = alg; - dev->od_capa_keys = keys; - RETURN(0); -} - /** * Note: we do not count into QUOTA here. * If we mount with --data_journal we may need more. @@ -1540,7 +1409,6 @@ static const struct dt_device_operations osd_dt_ops = { .dt_sync = osd_sync, .dt_ro = osd_ro, .dt_commit_async = osd_commit_async, - .dt_init_capa_ctxt = osd_init_capa_ctxt, }; static void osd_object_read_lock(const struct lu_env *env, @@ -1612,108 +1480,6 @@ static int osd_object_write_locked(const struct lu_env *env, return obj->oo_owner == env; } -static int capa_is_sane(const struct lu_env *env, - struct osd_device *dev, - struct lustre_capa *capa, - struct lustre_capa_key *keys) -{ - struct osd_thread_info *oti = osd_oti_get(env); - struct lustre_capa *tcapa = &oti->oti_capa; - struct obd_capa *oc; - int i, rc = 0; - ENTRY; - - oc = capa_lookup(dev->od_capa_hash, capa, 0); - if (oc) { - if (capa_is_expired(oc)) { - DEBUG_CAPA(D_ERROR, capa, "expired"); - rc = -ESTALE; - } - capa_put(oc); - RETURN(rc); - } - - if (capa_is_expired_sec(capa)) { - DEBUG_CAPA(D_ERROR, capa, "expired"); - RETURN(-ESTALE); - } - - spin_lock(&capa_lock); - for (i = 0; i < 2; i++) { - if (keys[i].lk_keyid == capa->lc_keyid) { - oti->oti_capa_key = keys[i]; - break; - } - } - spin_unlock(&capa_lock); - - if (i == 2) { - DEBUG_CAPA(D_ERROR, capa, "no matched capa key"); - RETURN(-ESTALE); - } - - rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key); - if (rc) - RETURN(rc); - - if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) { - DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); - RETURN(-EACCES); - } - - oc = capa_add(dev->od_capa_hash, capa); - capa_put(oc); - - RETURN(0); -} - -int osd_object_auth(const struct lu_env *env, struct dt_object *dt, - struct lustre_capa *capa, __u64 opc) -{ - const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); - struct lu_capainfo *lci; - int rc; - - if (!osd->od_fl_capa) - return 0; - - if (capa == BYPASS_CAPA) - return 0; - - lci = lu_capainfo_get(env); - if (unlikely(lci == NULL)) - return 0; - - if (lci->lci_auth == LC_ID_NONE) - return 0; - - if (capa == NULL) { - CERROR("%s: no capability provided for FID "DFID": rc = %d\n", - osd_name(osd), PFID(fid), -EACCES); - return -EACCES; - } - - if (!lu_fid_eq(fid, &capa->lc_fid)) { - DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with", - PFID(fid)); - return -EACCES; - } - - if (!capa_opc_supported(capa, opc)) { - DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc); - return -EACCES; - } - - rc = capa_is_sane(env, osd, capa, osd->od_capa_keys); - if (rc != 0) { - DEBUG_CAPA(D_ERROR, capa, "insane: rc = %d", rc); - return -EACCES; - } - - return 0; -} - static struct timespec *osd_inode_time(const struct lu_env *env, struct inode *inode, __u64 seconds) { @@ -1752,8 +1518,7 @@ static void osd_inode_getattr(const struct lu_env *env, static int osd_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct lustre_capa *capa) + struct lu_attr *attr) { struct osd_object *obj = osd_dt_obj(dt); @@ -1763,9 +1528,6 @@ static int osd_attr_get(const struct lu_env *env, LASSERT(!dt_object_remote(dt)); LINVRNT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) - return -EACCES; - spin_lock(&obj->oo_guard); osd_inode_getattr(env, obj->oo_inode, attr); spin_unlock(&obj->oo_guard); @@ -1995,8 +1757,7 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr) static int osd_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle, - struct lustre_capa *capa) + struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode; @@ -2009,9 +1770,6 @@ static int osd_attr_set(const struct lu_env *env, LASSERT(!dt_object_remote(dt)); LASSERT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) - return -EACCES; - osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET); if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING)) { @@ -2325,17 +2083,19 @@ static int __osd_object_create(struct osd_thread_info *info, result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof, th); - if (result == 0) { - osd_attr_init(info, obj, attr, dof); - osd_object_init0(obj); - } - - if (obj->oo_inode != NULL) { + if (likely(obj->oo_inode != NULL)) { LASSERT(obj->oo_inode->i_state & I_NEW); + /* Unlock the inode before attr initialization to avoid + * unnecessary dqget operations. LU-6378 */ unlock_new_inode(obj->oo_inode); } + if (likely(result == 0)) { + osd_attr_init(info, obj, attr, dof); + osd_object_init0(obj); + } + /* restore previous umask value */ current->fs->umask = umask; @@ -2984,8 +2744,7 @@ static int osd_object_version_get(const struct lu_env *env, * Concurrency: @dt is read locked. */ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name, - struct lustre_capa *capa) + struct lu_buf *buf, const char *name) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -3014,9 +2773,6 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op != NULL); LASSERT(inode->i_op->getxattr != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) - return -EACCES; - return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len); } @@ -3083,8 +2839,8 @@ static void osd_object_version_set(const struct lu_env *env, * Concurrency: @dt is write locked. */ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, int fl, - struct thandle *handle, struct lustre_capa *capa) + const struct lu_buf *buf, const char *name, int fl, + struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -3103,9 +2859,6 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, return sizeof(dt_obj_version_t); } - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) - return -EACCES; - CDEBUG(D_INODE, DFID" set xattr '%s' with size %zu\n", PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len); @@ -3144,7 +2897,7 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, * Concurrency: @dt is read locked. */ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, struct lustre_capa *capa) + const struct lu_buf *buf) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -3158,9 +2911,6 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op != NULL); LASSERT(inode->i_op->listxattr != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) - return -EACCES; - dentry->d_inode = inode; dentry->d_sb = inode->i_sb; return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len); @@ -3194,8 +2944,7 @@ static int osd_declare_xattr_del(const struct lu_env *env, * Concurrency: @dt is write locked. */ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *handle, - struct lustre_capa *capa) + const char *name, struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -3211,9 +2960,6 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op->removexattr != NULL); LASSERT(handle != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) - return -EACCES; - osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET); ll_vfs_dq_init(inode); @@ -3223,95 +2969,6 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, return rc; } -static struct obd_capa *osd_capa_get(const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *old, __u64 opc) -{ - struct osd_thread_info *info = osd_oti_get(env); - const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - struct lustre_capa_key *key = &info->oti_capa_key; - struct lustre_capa *capa = &info->oti_capa; - struct obd_capa *oc; - struct lu_capainfo *lci; - int rc; - ENTRY; - - if (!osd->od_fl_capa) - RETURN(ERR_PTR(-ENOENT)); - - if (!dt_object_exists(dt)) - RETURN(ERR_PTR(-ENOENT)); - - LASSERT(!dt_object_remote(dt)); - LINVRNT(osd_invariant(obj)); - - /* renewal sanity check */ - if (old && osd_object_auth(env, dt, old, opc)) - RETURN(ERR_PTR(-EACCES)); - - lci = lu_capainfo_get(env); - if (unlikely(lci == NULL)) - RETURN(ERR_PTR(-ENOENT)); - - switch (lci->lci_auth) { - case LC_ID_NONE: - RETURN(NULL); - case LC_ID_PLAIN: - capa->lc_uid = i_uid_read(obj->oo_inode); - capa->lc_gid = i_gid_read(obj->oo_inode); - capa->lc_flags = LC_ID_PLAIN; - break; - case LC_ID_CONVERT: { - __u32 d[4], s[4]; - - s[0] = i_uid_read(obj->oo_inode); - cfs_get_random_bytes(&(s[1]), sizeof(__u32)); - s[2] = i_uid_read(obj->oo_inode); - cfs_get_random_bytes(&(s[3]), sizeof(__u32)); - rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN); - if (unlikely(rc)) - RETURN(ERR_PTR(rc)); - - capa->lc_uid = ((__u64)d[1] << 32) | d[0]; - capa->lc_gid = ((__u64)d[3] << 32) | d[2]; - capa->lc_flags = LC_ID_CONVERT; - break; - } - default: - RETURN(ERR_PTR(-EINVAL)); - } - - capa->lc_fid = *fid; - capa->lc_opc = opc; - capa->lc_flags |= osd->od_capa_alg << 24; - capa->lc_timeout = osd->od_capa_timeout; - capa->lc_expiry = 0; - - oc = capa_lookup(osd->od_capa_hash, capa, 1); - if (oc) { - LASSERT(!capa_is_expired(oc)); - RETURN(oc); - } - - spin_lock(&capa_lock); - *key = osd->od_capa_keys[1]; - spin_unlock(&capa_lock); - - capa->lc_keyid = key->lk_keyid; - capa->lc_expiry = cfs_time_current_sec() + osd->od_capa_timeout; - - rc = capa_hmac(capa->lc_hmac, capa, key->lk_key); - if (rc) { - DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc); - RETURN(ERR_PTR(rc)); - } - - oc = capa_add(osd->od_capa_hash, capa); - RETURN(oc); -} - static int osd_object_sync(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end) { @@ -3336,16 +2993,6 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -static int osd_data_get(const struct lu_env *env, struct dt_object *dt, - void **data) -{ - struct osd_object *obj = osd_dt_obj(dt); - ENTRY; - - *data = (void *)obj->oo_inode; - RETURN(0); -} - /* * Index operations. */ @@ -3481,8 +3128,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, static int osd_otable_it_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct lustre_capa *capa) + struct lu_attr *attr) { attr->la_valid = 0; return 0; @@ -3513,9 +3159,7 @@ static const struct dt_object_operations osd_obj_ops = { .do_declare_xattr_del = osd_declare_xattr_del, .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, - .do_data_get = osd_data_get, }; /** @@ -3547,9 +3191,7 @@ static const struct dt_object_operations osd_obj_ea_ops = { .do_declare_xattr_del = osd_declare_xattr_del, .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, - .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, - .do_data_get = osd_data_get, }; static const struct dt_object_operations osd_obj_otable_it_ops = { @@ -3586,8 +3228,7 @@ static int osd_index_declare_iam_delete(const struct lu_env *env, */ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa) + struct thandle *handle) { struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *obj = osd_dt_obj(dt); @@ -3605,9 +3246,6 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(bag->ic_object == obj->oo_inode); LASSERT(handle != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) - RETURN(-EACCES); - osd_trans_exec_op(env, handle, OSD_OT_DELETE); ipd = osd_idx_ipd_get(env, bag); @@ -3704,8 +3342,7 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, * \retval -ve, on error */ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle, - struct lustre_capa *capa) + const struct dt_key *key, struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); struct inode *dir = obj->oo_inode; @@ -3732,9 +3369,6 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(oh->ot_handle != NULL); LASSERT(oh->ot_handle->h_transaction != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) - RETURN(-EACCES); - ll_vfs_dq_init(dir); dentry = osd_child_dentry_get(env, obj, (char *)key, strlen((char *)key)); @@ -3863,8 +3497,7 @@ out: * \retval -ve failure */ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; @@ -3882,9 +3515,6 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, LASSERT(!dt_object_remote(dt)); LASSERT(bag->ic_object == obj->oo_inode); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - RETURN(-EACCES); - ipd = osd_idx_ipd_get(env, bag); if (IS_ERR(ipd)) RETURN(-ENOMEM); @@ -3956,7 +3586,7 @@ static int osd_index_declare_iam_insert(const struct lu_env *env, static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ignore_quota) + int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; @@ -3977,9 +3607,6 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, LASSERT(bag->ic_object == obj->oo_inode); LASSERT(th != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) - RETURN(-EACCES); - osd_trans_exec_op(env, th, OSD_OT_INSERT); ipd = osd_idx_ipd_get(env, bag); @@ -4580,9 +4207,9 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, * \retval -ve, on error */ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ignore_quota) + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_dev(dt->do_lu.lo_dev); @@ -4605,9 +4232,6 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, osd_trans_exec_op(env, th, OSD_OT_INSERT); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) - RETURN(-EACCES); - LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!\n", PFID(fid)); rc = osd_remote_fid(env, osd, fid); @@ -4673,9 +4297,8 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, */ static struct dt_it *osd_it_iam_init(const struct lu_env *env, - struct dt_object *dt, - __u32 unused, - struct lustre_capa *capa) + struct dt_object *dt, + __u32 unused) { struct osd_it_iam *it; struct osd_object *obj = osd_dt_obj(dt); @@ -4686,9 +4309,6 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env, if (!dt_object_exists(dt)) return ERR_PTR(-ENOENT); - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - return ERR_PTR(-EACCES); - OBD_ALLOC_PTR(it); if (it == NULL) return ERR_PTR(-ENOMEM); @@ -4957,8 +4577,7 @@ static const struct dt_index_operations osd_index_iam_ops = { */ static struct dt_it *osd_it_ea_init(const struct lu_env *env, struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) + __u32 attr) { struct osd_object *obj = osd_dt_obj(dt); struct osd_thread_info *info = osd_oti_get(env); @@ -5786,8 +5405,7 @@ static int osd_it_ea_load(const struct lu_env *env, * \retval -ve, on error */ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_object *obj = osd_dt_obj(dt); int rc = 0; @@ -5797,9 +5415,6 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, LASSERT(S_ISDIR(obj->oo_inode->i_mode)); LINVRNT(osd_invariant(obj)); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - return -EACCES; - rc = osd_ea_lookup_rec(env, obj, rec, key); if (rc == 0) rc = +1; @@ -5864,6 +5479,8 @@ static void osd_key_fini(const struct lu_context *ctx, { struct osd_thread_info *info = data; + if (info->oti_inode != NULL) + OBD_FREE_PTR(info->oti_inode); if (info->oti_hlock != NULL) ldiskfs_htree_lock_free(info->oti_hlock); OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); @@ -5986,7 +5603,7 @@ static int osd_mount(const struct lu_env *env, struct osd_thread_info *info = osd_oti_get(env); struct lu_fid *fid = &info->oti_fid; struct inode *inode; - int rc = 0; + int rc = 0, force_over_128tb = 0; ENTRY; if (o->od_mnt != NULL) @@ -5996,23 +5613,64 @@ static int osd_mount(const struct lu_env *env, RETURN(-E2BIG); strcpy(o->od_mntdev, dev); - OBD_PAGE_ALLOC(__page, GFP_IOFS); - if (__page == NULL) - GOTO(out, rc = -ENOMEM); - str = lustre_cfg_string(cfg, 2); s_flags = simple_strtoul(str, NULL, 0); str = strstr(str, ":"); if (str) lmd_flags = simple_strtoul(str + 1, NULL, 0); opts = lustre_cfg_string(cfg, 3); +#ifdef __BIG_ENDIAN + if (opts == NULL || strstr(opts, "bigendian_extents") == NULL) { + CERROR("%s: device %s extents feature is not guaranteed to " + "work on big-endian systems. Use \"bigendian_extents\" " + "mount option to override.\n", name, dev); + RETURN(-EINVAL); + } +#endif + if (opts != NULL && strstr(opts, "force_over_128tb") != NULL) + force_over_128tb = 1; + + OBD_PAGE_ALLOC(__page, GFP_IOFS); + if (__page == NULL) + GOTO(out, rc = -ENOMEM); page = (unsigned long)page_address(__page); options = (char *)page; *options = '\0'; - if (opts == NULL) - strcat(options, "user_xattr,acl"); - else + if (opts != NULL) { + /* strip out the options for back compatiblity */ + static char *sout[] = { + "mballoc", + "iopen", + "noiopen", + "iopen_nopriv", + "extents", + "noextents", + /* strip out option we processed in osd */ + "bigendian_extents", + "force_over_128tb", + NULL + }; strcat(options, opts); + for (rc = 0, str = options; sout[rc]; ) { + char *op = strstr(str, sout[rc]); + if (op == NULL) { + rc++; + str = options; + continue; + } + if (op == options || *(op - 1) == ',') { + str = op + strlen(sout[rc]); + if (*str == ',' || *str == '\0') { + *str == ',' ? str++ : str; + memmove(op, str, strlen(str) + 1); + } + } + for (str = op; *str != ',' && *str != '\0'; str++) + ; + } + } else { + strncat(options, "user_xattr,acl", 14); + } /* Glom up mount options */ if (*options != '\0') @@ -6035,6 +5693,15 @@ static int osd_mount(const struct lu_env *env, GOTO(out, rc); } + if (ldiskfs_blocks_count(LDISKFS_SB(osd_sb(o))->s_es) > (8ULL << 32) && + force_over_128tb == 0) { + CERROR("%s: device %s LDISKFS does not support filesystems " + "greater than 128TB and can cause data corruption. " + "Use \"force_over_128tb\" mount option to override.\n", + name, dev); + GOTO(out, rc = -EINVAL); + } + #ifdef HAVE_DEV_SET_RDONLY if (dev_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) { CERROR("%s: underlying device %s is marked as read-only. " @@ -6120,10 +5787,6 @@ static int osd_device_init0(const struct lu_env *env, spin_lock_init(&o->od_osfs_lock); mutex_init(&o->od_otable_mutex); - o->od_capa_hash = init_capa_hash(); - if (o->od_capa_hash == NULL) - GOTO(out, rc = -ENOMEM); - o->od_read_cache = 1; o->od_writethrough_cache = 1; o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE; @@ -6132,7 +5795,7 @@ static int osd_device_init0(const struct lu_env *env, sizeof(o->od_svname)); if (cplen >= sizeof(o->od_svname)) { rc = -E2BIG; - GOTO(out_capa, rc); + GOTO(out, rc); } if (server_name_is_ost(o->od_svname)) @@ -6142,7 +5805,7 @@ static int osd_device_init0(const struct lu_env *env, o->od_full_scrub_threshold_rate = FULL_SCRUB_THRESHOLD_RATE_DEFAULT; rc = osd_mount(env, o, cfg); if (rc != 0) - GOTO(out_capa, rc); + GOTO(out, rc); rc = osd_obj_map_init(env, o); if (rc != 0) @@ -6194,8 +5857,6 @@ out_compat: osd_obj_map_fini(o); out_mnt: osd_umount(env, o); -out_capa: - cleanup_capa_hash(o->od_capa_hash); out: return rc; } @@ -6233,7 +5894,6 @@ static struct lu_device *osd_device_free(const struct lu_env *env, struct osd_device *o = osd_dev(d); ENTRY; - cleanup_capa_hash(o->od_capa_hash); /* XXX: make osd top device in order to release reference */ d->ld_site->ls_top_dev = d; lu_site_purge(env, d->ld_site, -1); @@ -6450,6 +6110,11 @@ static int __init osd_mod_init(void) { int rc; +#if !defined(CONFIG_DEBUG_MUTEXES) && !defined(CONFIG_DEBUG_SPINLOCK) + /* please, try to keep osd_thread_info smaller than a page */ + CLASSERT(sizeof(struct osd_thread_info) <= PAGE_SIZE); +#endif + osd_oi_mod_init(); rc = lu_kmem_init(ldiskfs_caches); @@ -6472,6 +6137,8 @@ static void __exit osd_mod_exit(void) MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit); +module_init(osd_mod_init); +module_exit(osd_mod_exit);