X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ffld%2Ffld_index.c;h=2a8d3141bae0a1c4e919772d046b29a676aa49aa;hb=155e4b6cf45cc0ab21f72d94e5cccbd7a0939c58;hp=aba0bb023b007266284b726219f478c98276632d;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea;p=fs%2Flustre-release.git diff --git a/lustre/fld/fld_index.c b/lustre/fld/fld_index.c index aba0bb0..2a8d314 100644 --- a/lustre/fld/fld_index.c +++ b/lustre/fld/fld_index.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -38,9 +38,7 @@ * Author: WangDi * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif + #define DEBUG_SUBSYSTEM S_FLD #ifdef __KERNEL__ @@ -60,32 +58,31 @@ #include #include #include +#include #include #include "fld_internal.h" const char fld_index_name[] = "fld"; -static const struct dt_index_features fld_index_features = { - .dif_flags = DT_IND_UPDATE, - .dif_keysize_min = sizeof(seqno_t), - .dif_keysize_max = sizeof(seqno_t), - .dif_recsize_min = sizeof(mdsno_t), - .dif_recsize_max = sizeof(mdsno_t) +static const struct lu_seq_range IGIF_FLD_RANGE = { + .lsr_start = 1, + .lsr_end = FID_SEQ_IDIF, + .lsr_index = 0, + .lsr_flags = LU_SEQ_RANGE_MDT }; -/* - * number of blocks to reserve for particular operations. Should be function of - * ... something. Stub for now. - */ -enum { - FLD_TXN_INDEX_INSERT_CREDITS = 20, - FLD_TXN_INDEX_DELETE_CREDITS = 20, +const struct dt_index_features fld_index_features = { + .dif_flags = DT_IND_UPDATE | DT_IND_RANGE, + .dif_keysize_min = sizeof(seqno_t), + .dif_keysize_max = sizeof(seqno_t), + .dif_recsize_min = sizeof(struct lu_seq_range), + .dif_recsize_max = sizeof(struct lu_seq_range), + .dif_ptrsize = 4 }; extern struct lu_context_key fld_thread_key; -static struct dt_key *fld_key(const struct lu_env *env, - const seqno_t seq) +static struct dt_key *fld_key(const struct lu_env *env, const seqno_t seq) { struct fld_thread_info *info; ENTRY; @@ -98,83 +95,234 @@ static struct dt_key *fld_key(const struct lu_env *env, } static struct dt_rec *fld_rec(const struct lu_env *env, - const mdsno_t mds) + const struct lu_seq_range *range) { struct fld_thread_info *info; + struct lu_seq_range *rec; ENTRY; info = lu_context_key_get(&env->le_ctx, &fld_thread_key); LASSERT(info != NULL); + rec = &info->fti_rec; + + range_cpu_to_be(rec, range); + RETURN((void *)rec); +} + +struct thandle *fld_trans_create(struct lu_server_fld *fld, + const struct lu_env *env) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + + return dt_dev->dd_ops->dt_trans_create(env, dt_dev); +} + +int fld_trans_start(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle *th) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + + return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th); +} + +void fld_trans_stop(struct lu_server_fld *fld, + const struct lu_env *env, struct thandle* th) +{ + struct dt_device *dt_dev; + + dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + dt_dev->dd_ops->dt_trans_stop(env, th); +} + +int fld_declare_index_create(struct lu_server_fld *fld, + const struct lu_env *env, + const struct lu_seq_range *range, + struct thandle *th) +{ + struct dt_object *dt_obj = fld->lsf_obj; + seqno_t start; + int rc; + + ENTRY; - info->fti_rec = cpu_to_be64(mds); - RETURN((void *)&info->fti_rec); + if (fld->lsf_no_range_lookup) { + /* Stub for underlying FS which can't lookup ranges */ + return 0; + } + + start = range->lsr_start; + LASSERT(range_is_sane(range)); + + rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj, + fld_rec(env, range), + fld_key(env, start), th); + RETURN(rc); } +/** + * insert range in fld store. + * + * \param range range to be inserted + * \param th transaction for this operation as it could compound + * transaction. + * + * \retval 0 success + * \retval -ve error + */ + int fld_index_create(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t mds) + const struct lu_seq_range *range, + struct thandle *th) { struct dt_object *dt_obj = fld->lsf_obj; - struct dt_device *dt_dev; - struct txn_param txn; - struct thandle *th; + seqno_t start; int rc; + ENTRY; - dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); + if (fld->lsf_no_range_lookup) { + /* Stub for underlying FS which can't lookup ranges */ + if (range->lsr_index != 0) { + CERROR("%s: FLD backend does not support range" + "lookups, so DNE and FIDs-on-OST are not" + "supported in this configuration\n", + fld->lsf_name); + return -EINVAL; + } + } + + start = range->lsr_start; + LASSERT(range_is_sane(range)); - /* stub here, will fix it later */ - txn_param_init(&txn, FLD_TXN_INDEX_INSERT_CREDITS); - - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn); - if (!IS_ERR(th)) { - rc = dt_obj->do_index_ops->dio_insert(env, dt_obj, - fld_rec(env, mds), - fld_key(env, seq), - th, BYPASS_CAPA, 1); - dt_dev->dd_ops->dt_trans_stop(env, th); - } else - rc = PTR_ERR(th); + rc = dt_obj->do_index_ops->dio_insert(env, dt_obj, + fld_rec(env, range), + fld_key(env, start), + th, BYPASS_CAPA, 1); + + CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n", + fld->lsf_name, PRANGE(range), rc); RETURN(rc); } +/** + * delete range in fld store. + * + * \param range range to be deleted + * \param th transaction + * + * \retval 0 success + * \retval -ve error + */ + int fld_index_delete(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq) + struct lu_seq_range *range, + struct thandle *th) { struct dt_object *dt_obj = fld->lsf_obj; - struct dt_device *dt_dev; - struct txn_param txn; - struct thandle *th; + seqno_t seq = range->lsr_start; int rc; + ENTRY; - dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev); - txn_param_init(&txn, FLD_TXN_INDEX_DELETE_CREDITS); - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn); - if (!IS_ERR(th)) { - rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, - fld_key(env, seq), th, - BYPASS_CAPA); - dt_dev->dd_ops->dt_trans_stop(env, th); - } else - rc = PTR_ERR(th); + rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq), + th, BYPASS_CAPA); + + CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n", + fld->lsf_name, PRANGE(range), rc); + RETURN(rc); } +/** + * lookup range for a seq passed. note here we only care about the start/end, + * caller should handle the attached location data (flags, index). + * + * \param seq seq for lookup. + * \param range result of lookup. + * + * \retval 0 found, \a range is the matched range; + * \retval -ENOENT not found, \a range is the left-side range; + * \retval -ve other error; + */ + int fld_index_lookup(struct lu_server_fld *fld, const struct lu_env *env, - seqno_t seq, mdsno_t *mds) + seqno_t seq, + struct lu_seq_range *range) { - struct dt_object *dt_obj = fld->lsf_obj; - struct dt_rec *rec = fld_rec(env, 0); + struct dt_object *dt_obj = fld->lsf_obj; + struct lu_seq_range *fld_rec; + struct dt_key *key = fld_key(env, seq); + struct fld_thread_info *info; int rc; + ENTRY; - rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec, - fld_key(env, seq), BYPASS_CAPA); - if (rc == 0) - *mds = be64_to_cpu(*(__u64 *)rec); + if (fld->lsf_no_range_lookup) { + /* Stub for underlying FS which can't lookup ranges */ + range->lsr_start = 0; + range->lsr_end = ~0; + range->lsr_index = 0; + range->lsr_flags = LU_SEQ_RANGE_MDT; + + range_cpu_to_be(range, range); + return 0; + } + + info = lu_context_key_get(&env->le_ctx, &fld_thread_key); + fld_rec = &info->fti_rec; + + rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, + (struct dt_rec*) fld_rec, + key, BYPASS_CAPA); + + if (rc >= 0) { + range_be_to_cpu(fld_rec, fld_rec); + *range = *fld_rec; + if (range_within(range, seq)) + rc = 0; + else + rc = -ENOENT; + } + + CDEBUG(D_INFO, "%s: lookup seq = "LPX64" range : "DRANGE" rc = %d\n", + fld->lsf_name, seq, PRANGE(range), rc); + + RETURN(rc); +} + +static int fld_insert_igif_fld(struct lu_server_fld *fld, + const struct lu_env *env) +{ + struct thandle *th; + int rc; + ENTRY; + + /* FLD_TXN_INDEX_INSERT_CREDITS */ + th = fld_trans_create(fld, env); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th); + if (rc) { + fld_trans_stop(fld, env, th); + RETURN(rc); + } + rc = fld_trans_start(fld, env, th); + if (rc) { + fld_trans_stop(fld, env, th); + RETURN(rc); + } + + rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th); + fld_trans_stop(fld, env, th); + if (rc == -EEXIST) + rc = 0; RETURN(rc); } @@ -184,19 +332,50 @@ int fld_index_init(struct lu_server_fld *fld, { struct dt_object *dt_obj; struct lu_fid fid; + struct lu_attr attr; + struct dt_object_format dof; int rc; ENTRY; - dt_obj = dt_store_open(env, dt, fld_index_name, &fid); + lu_local_obj_fid(&fid, FLD_INDEX_OID); + + memset(&attr, 0, sizeof(attr)); + attr.la_valid = LA_MODE; + attr.la_mode = S_IFREG | 0666; + dof.dof_type = DFT_INDEX; + dof.u.dof_idx.di_feat = &fld_index_features; + + dt_obj = dt_find_or_create(env, dt, &fid, &dof, &attr); if (!IS_ERR(dt_obj)) { fld->lsf_obj = dt_obj; rc = dt_obj->do_ops->do_index_try(env, dt_obj, &fld_index_features); - if (rc == 0) + if (rc == 0) { LASSERT(dt_obj->do_index_ops != NULL); - else - CERROR("%s: File \"%s\" is not an index!\n", - fld->lsf_name, fld_index_name); + rc = fld_insert_igif_fld(fld, env); + + if (rc != 0) { + CERROR("insert igif in fld! = %d\n", rc); + lu_object_put(env, &dt_obj->do_lu); + fld->lsf_obj = NULL; + } + } else if (rc == -ERANGE) { + CWARN("%s: File \"%s\" doesn't support range lookup, " + "using stub. DNE and FIDs on OST will not work " + "with this backend\n", + fld->lsf_name, fld_index_name); + + LASSERT(dt_obj->do_index_ops == NULL); + fld->lsf_no_range_lookup = 1; + rc = 0; + } else { + CERROR("%s: File \"%s\" is not index, rc %d!\n", + fld->lsf_name, fld_index_name, rc); + lu_object_put(env, &fld->lsf_obj->do_lu); + fld->lsf_obj = NULL; + } + + } else { CERROR("%s: Can't find \"%s\" obj %d\n", fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));