From 03f537c50b760b9319052f5600cbdb6ab2c2c94d Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Sun, 28 Oct 2012 21:38:11 +0300 Subject: [PATCH] LU-2244 lov: remove unused bits from lov, osc - precreation logic in OSC - QoS code in LOV - fake requests in ptlrpc Signed-off-by: Alex Zhuravlev Change-Id: I7a3e3ec6cf254a9fcd53ae7eab6c2d23b7520234 Reviewed-on: http://review.whamcloud.com/4399 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Mike Pershin --- lustre/include/lustre_export.h | 16 - lustre/include/lustre_net.h | 6 - lustre/include/obd.h | 2 - lustre/lov/Makefile.in | 2 +- lustre/lov/autoMakefile.am | 3 +- lustre/lov/lov_obd.c | 147 +---- lustre/lov/lov_qos.c | 1246 ---------------------------------------- lustre/lov/lov_request.c | 218 ------- lustre/lov/lproc_lov.c | 103 ---- lustre/osc/Makefile.in | 2 +- lustre/osc/autoMakefile.am | 3 +- lustre/osc/lproc_osc.c | 148 ----- lustre/osc/osc_create.c | 749 ------------------------ lustre/osc/osc_internal.h | 14 - lustre/osc/osc_request.c | 162 +----- lustre/osp/osp_precreate.c | 14 + lustre/ptlrpc/client.c | 93 +-- 17 files changed, 50 insertions(+), 2878 deletions(-) delete mode 100644 lustre/lov/lov_qos.c delete mode 100644 lustre/osc/osc_create.c diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 8dec5f2..166955b 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -83,22 +83,6 @@ struct mdt_export_data { struct lustre_idmap_table *med_idmap; }; -struct osc_creator { - cfs_spinlock_t oscc_lock; - cfs_list_t oscc_wait_create_list; - struct obd_device *oscc_obd; - obd_id oscc_last_id;//last available pre-created object - obd_id oscc_next_id;// what object id to give out next - int oscc_grow_count; - /** - * Limit oscc_grow_count value, can be changed via proc fs - */ - int oscc_max_grow_count; - struct obdo oscc_oa; - int oscc_flags; - cfs_waitq_t oscc_waitq; /* creating procs wait on this */ -}; - struct ec_export_data { /* echo client */ cfs_list_t eced_locks; }; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index a218f55..0c00e64 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -708,7 +708,6 @@ struct ptlrpc_request { rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1, rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1, rq_early:1, rq_must_unlink:1, - rq_fake:1, /* this fake req */ rq_memalloc:1, /* req originated from "kswapd" */ /* server-side flags */ rq_packed_final:1, /* packed final reply */ @@ -1755,11 +1754,6 @@ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp, int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, struct ptlrpc_cli_ctx *ctx); -struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp, - unsigned int timeout, - ptlrpc_interpterer_t interpreter); -void ptlrpc_fakereq_finished(struct ptlrpc_request *req); - struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, __u32 *lengths, char **bufs); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9a30876..be65f34 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -519,7 +519,6 @@ struct client_obd { struct mdc_rpc_lock *cl_rpc_lock; struct mdc_rpc_lock *cl_close_lock; - struct osc_creator cl_oscc; /* mgc datastruct */ cfs_semaphore_t cl_mgc_sem; @@ -691,7 +690,6 @@ struct lov_obd { array */ cfs_mutex_t lov_lock; struct obd_connect_data lov_ocd; - struct lov_qos lov_qos; /* qos info per lov */ cfs_atomic_t lov_refcount; __u32 lov_tgt_count; /* how many OBD's */ __u32 lov_active_tgt_count; /* how many active */ diff --git a/lustre/lov/Makefile.in b/lustre/lov/Makefile.in index 59f7c79..9247372 100644 --- a/lustre/lov/Makefile.in +++ b/lustre/lov/Makefile.in @@ -1,5 +1,5 @@ MODULES := lov -lov-objs := lov_log.o lov_obd.o lov_pack.o lproc_lov.o lov_offset.o lov_merge.o lov_request.o lov_qos.o lov_ea.o lov_dev.o lov_object.o lov_page.o lov_lock.o lov_io.o lovsub_dev.o lovsub_object.o lovsub_page.o lovsub_lock.o lovsub_io.o lov_pool.o +lov-objs := lov_log.o lov_obd.o lov_pack.o lproc_lov.o lov_offset.o lov_merge.o lov_request.o lov_ea.o lov_dev.o lov_object.o lov_page.o lov_lock.o lov_io.o lovsub_dev.o lovsub_object.o lovsub_page.o lovsub_lock.o lovsub_io.o lov_pool.o EXTRA_DIST = $(lov-objs:.o=.c) lov_internal.h lov_cl_internal.h diff --git a/lustre/lov/autoMakefile.am b/lustre/lov/autoMakefile.am index 01120dc..82abfe9 100644 --- a/lustre/lov/autoMakefile.am +++ b/lustre/lov/autoMakefile.am @@ -36,7 +36,7 @@ if LIBLUSTRE noinst_LIBRARIES = liblov.a -liblov_a_SOURCES = lov_log.c lov_pool.c lov_obd.c lov_pack.c lov_request.c lov_offset.c lov_qos.c lov_merge.c lov_ea.c lov_internal.h lov_cl_internal.h lov_dev.c lov_object.c lov_page.c lov_lock.c lov_io.c lovsub_dev.c lovsub_object.c lovsub_page.c lovsub_lock.c lovsub_io.c +liblov_a_SOURCES = lov_log.c lov_pool.c lov_obd.c lov_pack.c lov_request.c lov_offset.c lov_merge.c lov_ea.c lov_internal.h lov_cl_internal.h lov_dev.c lov_object.c lov_page.c lov_lock.c lov_io.c lovsub_dev.c lovsub_object.c lovsub_page.c lovsub_lock.c lovsub_io.c liblov_a_CPPFLAGS = $(LLCPPFLAGS) liblov_a_CFLAGS = $(LLCFLAGS) endif @@ -56,7 +56,6 @@ lov_SOURCES = \ lov_pack.c \ lov_request.c \ lov_merge.c \ - lov_qos.c \ lov_dev.c \ lov_object.c \ lov_page.c \ diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 207f39c..4ac123c 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -221,10 +221,6 @@ int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, } #endif - rc = qos_add_tgt(obd, index); - if (rc) - CERROR("qos_add_tgt failed %d\n", rc); - RETURN(0); } @@ -334,8 +330,6 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt) rc = 0; } - qos_del_tgt(obd, tgt); - tgt->ltd_exp = NULL; RETURN(0); } @@ -441,8 +435,6 @@ static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid, lov->desc.ld_active_tgt_count--; lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1; } - /* remove any old qos penalty */ - lov->lov_tgts[index]->ltd_qos.ltq_penalty = 0; } else { CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid); } @@ -813,21 +805,7 @@ int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg) cfs_mutex_init(&lov->lov_lock); cfs_atomic_set(&lov->lov_refcount, 0); - CFS_INIT_LIST_HEAD(&lov->lov_qos.lq_oss_list); - cfs_init_rwsem(&lov->lov_qos.lq_rw_sem); lov->lov_sp_me = LUSTRE_SP_CLI; - lov->lov_qos.lq_dirty = 1; - lov->lov_qos.lq_rr.lqr_dirty = 1; - lov->lov_qos.lq_reset = 1; - /* Default priority is toward free space balance */ - lov->lov_qos.lq_prio_free = 232; - /* Default threshold for rr (roughly 17%) */ - lov->lov_qos.lq_threshold_rr = 43; - /* Init statfs fields */ - OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data); - if (NULL == lov->lov_qos.lq_statfs_data) - RETURN(-ENOMEM); - cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq); lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS, HASH_POOLS_MAX_BITS, @@ -840,10 +818,7 @@ int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lov->lov_pool_count = 0; rc = lov_ost_pool_init(&lov->lov_packed, 0); if (rc) - GOTO(out_free_statfs, rc); - rc = lov_ost_pool_init(&lov->lov_qos.lq_rr.lqr_pool, 0); - if (rc) - GOTO(out_free_lov_packed, rc); + GOTO(out, rc); lprocfs_lov_init_vars(&lvars); lprocfs_obd_setup(obd, lvars.obd_vars); @@ -863,10 +838,7 @@ int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(0); -out_free_lov_packed: - lov_ost_pool_free(&lov->lov_packed); -out_free_statfs: - OBD_FREE_PTR(lov->lov_qos.lq_statfs_data); +out: return rc; } @@ -914,7 +886,6 @@ static int lov_cleanup(struct obd_device *obd) lov_pool_del(obd, pool->pool_name); } cfs_hash_putref(lov->lov_pools_hash_body); - lov_ost_pool_free(&(lov->lov_qos.lq_rr.lqr_pool)); lov_ost_pool_free(&lov->lov_packed); lprocfs_obd_cleanup(obd); @@ -942,7 +913,6 @@ static int lov_cleanup(struct obd_device *obd) lov->lov_tgt_size); lov->lov_tgt_size = 0; } - OBD_FREE_PTR(lov->lov_qos.lq_statfs_data); RETURN(0); } @@ -1011,82 +981,6 @@ out: RETURN(rc); } -#ifndef log2 -#define log2(n) cfs_ffz(~(n)) -#endif - -static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, - struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - struct lov_obd *lov; - struct obdo *tmp_oa; - struct obd_uuid *ost_uuid = NULL; - int rc = 0, i; - ENTRY; - - LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS && - src_oa->o_flags == OBD_FL_DELORPHAN); - - lov = &export->exp_obd->u.lov; - - OBDO_ALLOC(tmp_oa); - if (tmp_oa == NULL) - RETURN(-ENOMEM); - - if (oti->oti_ost_uuid) { - ost_uuid = oti->oti_ost_uuid; - CDEBUG(D_HA, "clearing orphans only for %s\n", - ost_uuid->uuid); - } - - obd_getref(export->exp_obd); - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - struct lov_stripe_md obj_md; - struct lov_stripe_md *obj_mdp = &obj_md; - struct lov_tgt_desc *tgt; - int err; - - tgt = lov->lov_tgts[i]; - if (!tgt) - continue; - - /* if called for a specific target, we don't - care if it is not active. */ - if (!lov->lov_tgts[i]->ltd_active && ost_uuid == NULL) { - CDEBUG(D_HA, "lov idx %d inactive\n", i); - continue; - } - - if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid)) - continue; - - CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, - obd_uuid2str(ost_uuid)); - - memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - - LASSERT(lov->lov_tgts[i]->ltd_exp); - /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - err = obd_create(NULL, lov->lov_tgts[i]->ltd_exp, - tmp_oa, &obj_mdp, oti); - if (err) { - /* This export will be disabled until it is recovered, - and then orphan recovery will be completed. */ - CERROR("error in orphan recovery on OST idx %d/%d: " - "rc = %d\n", i, lov->desc.ld_tgt_count, err); - rc = err; - } - - if (ost_uuid) - break; - } - obd_putref(export->exp_obd); - - OBDO_FREE(tmp_oa); - RETURN(rc); -} - static int lov_recreate(struct obd_export *exp, struct obdo *src_oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { @@ -1134,10 +1028,6 @@ static int lov_create(const struct lu_env *env, struct obd_export *exp, struct obd_trans_info *oti) { struct lov_obd *lov; - struct obd_info oinfo; - struct lov_request_set *set = NULL; - struct lov_request *req; - struct l_wait_info lwi = { 0 }; int rc = 0; ENTRY; @@ -1147,8 +1037,8 @@ static int lov_create(const struct lu_env *env, struct obd_export *exp, if ((src_oa->o_valid & OBD_MD_FLFLAGS) && src_oa->o_flags == OBD_FL_DELORPHAN) { - rc = lov_clear_orphans(exp, src_oa, ea, oti); - RETURN(rc); + /* should be used with LOV anymore */ + LBUG(); } lov = &exp->exp_obd->u.lov; @@ -1160,37 +1050,8 @@ static int lov_create(const struct lu_env *env, struct obd_export *exp, if ((src_oa->o_valid & OBD_MD_FLFLAGS) && (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) { rc = lov_recreate(exp, src_oa, ea, oti); - GOTO(out, rc); - } - - /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s, - * later in alloc_qos(), we will wait for those rpcs to complete if - * the osfs age is older than 2 * qos_maxage */ - qos_statfs_update(exp->exp_obd, - cfs_time_shift_64(-lov->desc.ld_qos_maxage + - OBD_STATFS_CACHE_SECONDS), - 0); - - rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set); - if (rc) - GOTO(out, rc); - - cfs_list_for_each_entry(req, &set->set_list, rq_link) { - /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - rc = obd_create_async(lov->lov_tgts[req->rq_idx]->ltd_exp, - &req->rq_oi, &req->rq_oi.oi_md, oti); } - /* osc_create have timeout equ obd_timeout/2 so waiting don't be - * longer then this */ - l_wait_event(set->set_waitq, lov_set_finished(set, 1), &lwi); - - /* we not have ptlrpc set for assign set->interpret and should - * be call interpret function himself. calling from cb_create_update - * not permited because lov_fini_create_set can sleep for long time, - * but we must avoid sleeping in ptlrpcd interpret function. */ - rc = lov_fini_create_set(set, ea); -out: obd_putref(exp->exp_obd); RETURN(rc); } diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c deleted file mode 100644 index cc11cd7..0000000 --- a/lustre/lov/lov_qos.c +++ /dev/null @@ -1,1246 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, 2012, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - */ - -#define DEBUG_SUBSYSTEM S_LOV - -#ifdef __KERNEL__ -#include -#else -#include -#endif - -#include -#include -#include -#include "lov_internal.h" - -/* #define QOS_DEBUG 1 */ -#define D_QOS D_OTHER - -#define TGT_BAVAIL(i) (lov->lov_tgts[i]->ltd_exp->exp_obd->obd_osfs.os_bavail *\ - lov->lov_tgts[i]->ltd_exp->exp_obd->obd_osfs.os_bsize) - - -int qos_add_tgt(struct obd_device *obd, __u32 index) -{ - struct lov_obd *lov = &obd->u.lov; - struct lov_qos_oss *oss, *temposs; - struct obd_export *exp = lov->lov_tgts[index]->ltd_exp; - int rc = 0, found = 0; - ENTRY; - - /* We only need this QOS struct on MDT, not clients - but we may not - * have registered the LOV's observer yet, so there's no way to know */ - if (!exp || !exp->exp_connection) { - CERROR("Missing connection\n"); - RETURN(-ENOTCONN); - } - - cfs_down_write(&lov->lov_qos.lq_rw_sem); - cfs_mutex_lock(&lov->lov_lock); - cfs_list_for_each_entry(oss, &lov->lov_qos.lq_oss_list, lqo_oss_list) { - if (obd_uuid_equals(&oss->lqo_uuid, - &exp->exp_connection->c_remote_uuid)) { - found++; - break; - } - } - - if (!found) { - OBD_ALLOC_PTR(oss); - if (!oss) - GOTO(out, rc = -ENOMEM); - memcpy(&oss->lqo_uuid, - &exp->exp_connection->c_remote_uuid, - sizeof(oss->lqo_uuid)); - } else { - /* Assume we have to move this one */ - cfs_list_del(&oss->lqo_oss_list); - } - - oss->lqo_ost_count++; - lov->lov_tgts[index]->ltd_qos.ltq_oss = oss; - - /* Add sorted by # of OSTs. Find the first entry that we're - bigger than... */ - cfs_list_for_each_entry(temposs, &lov->lov_qos.lq_oss_list, - lqo_oss_list) { - if (oss->lqo_ost_count > temposs->lqo_ost_count) - break; - } - /* ...and add before it. If we're the first or smallest, temposs - points to the list head, and we add to the end. */ - cfs_list_add_tail(&oss->lqo_oss_list, &temposs->lqo_oss_list); - - lov->lov_qos.lq_dirty = 1; - lov->lov_qos.lq_rr.lqr_dirty = 1; - - CDEBUG(D_QOS, "add tgt %s to OSS %s (%d OSTs)\n", - obd_uuid2str(&lov->lov_tgts[index]->ltd_uuid), - obd_uuid2str(&oss->lqo_uuid), - oss->lqo_ost_count); - -out: - cfs_mutex_unlock(&lov->lov_lock); - cfs_up_write(&lov->lov_qos.lq_rw_sem); - RETURN(rc); -} - -int qos_del_tgt(struct obd_device *obd, struct lov_tgt_desc *tgt) -{ - struct lov_obd *lov = &obd->u.lov; - struct lov_qos_oss *oss; - int rc = 0; - ENTRY; - - cfs_down_write(&lov->lov_qos.lq_rw_sem); - - oss = tgt->ltd_qos.ltq_oss; - if (!oss) - GOTO(out, rc = -ENOENT); - - oss->lqo_ost_count--; - if (oss->lqo_ost_count == 0) { - CDEBUG(D_QOS, "removing OSS %s\n", - obd_uuid2str(&oss->lqo_uuid)); - cfs_list_del(&oss->lqo_oss_list); - OBD_FREE_PTR(oss); - } - - lov->lov_qos.lq_dirty = 1; - lov->lov_qos.lq_rr.lqr_dirty = 1; -out: - cfs_up_write(&lov->lov_qos.lq_rw_sem); - RETURN(rc); -} - -/* Recalculate per-object penalties for OSSs and OSTs, - depends on size of each ost in an oss */ -static int qos_calc_ppo(struct obd_device *obd) -{ - struct lov_obd *lov = &obd->u.lov; - struct lov_qos_oss *oss; - __u64 ba_max, ba_min, temp; - __u32 num_active; - int rc, i, prio_wide; - time_t now, age; - ENTRY; - - if (!lov->lov_qos.lq_dirty) - GOTO(out, rc = 0); - - num_active = lov->desc.ld_active_tgt_count - 1; - if (num_active < 1) - GOTO(out, rc = -EAGAIN); - - /* find bavail on each OSS */ - cfs_list_for_each_entry(oss, &lov->lov_qos.lq_oss_list, lqo_oss_list) { - oss->lqo_bavail = 0; - } - lov->lov_qos.lq_active_oss_count = 0; - - /* How badly user wants to select osts "widely" (not recently chosen - and not on recent oss's). As opposed to "freely" (free space - avail.) 0-256. */ - prio_wide = 256 - lov->lov_qos.lq_prio_free; - - ba_min = (__u64)(-1); - ba_max = 0; - now = cfs_time_current_sec(); - /* Calculate OST penalty per object */ - /* (lov ref taken in alloc_qos) */ - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) - continue; - temp = TGT_BAVAIL(i); - if (!temp) - continue; - ba_min = min(temp, ba_min); - ba_max = max(temp, ba_max); - - /* Count the number of usable OSS's */ - if (lov->lov_tgts[i]->ltd_qos.ltq_oss->lqo_bavail == 0) - lov->lov_qos.lq_active_oss_count++; - lov->lov_tgts[i]->ltd_qos.ltq_oss->lqo_bavail += temp; - - /* per-OST penalty is prio * TGT_bavail / (num_ost - 1) / 2 */ - temp >>= 1; - lov_do_div64(temp, num_active); - lov->lov_tgts[i]->ltd_qos.ltq_penalty_per_obj = - (temp * prio_wide) >> 8; - - age = (now - lov->lov_tgts[i]->ltd_qos.ltq_used) >> 3; - if (lov->lov_qos.lq_reset || age > 32 * lov->desc.ld_qos_maxage) - lov->lov_tgts[i]->ltd_qos.ltq_penalty = 0; - else if (age > lov->desc.ld_qos_maxage) - /* Decay the penalty by half for every 8x the update - * interval that the device has been idle. That gives - * lots of time for the statfs information to be - * updated (which the penalty is only a proxy for), - * and avoids penalizing OSS/OSTs under light load. */ - lov->lov_tgts[i]->ltd_qos.ltq_penalty >>= - (age / lov->desc.ld_qos_maxage); - } - - num_active = lov->lov_qos.lq_active_oss_count - 1; - if (num_active < 1) { - /* If there's only 1 OSS, we can't penalize it, so instead - we have to double the OST penalty */ - num_active = 1; - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - if (lov->lov_tgts[i] == NULL) - continue; - lov->lov_tgts[i]->ltd_qos.ltq_penalty_per_obj <<= 1; - } - } - - /* Per-OSS penalty is prio * oss_avail / oss_osts / (num_oss - 1) / 2 */ - cfs_list_for_each_entry(oss, &lov->lov_qos.lq_oss_list, lqo_oss_list) { - temp = oss->lqo_bavail >> 1; - lov_do_div64(temp, oss->lqo_ost_count * num_active); - oss->lqo_penalty_per_obj = (temp * prio_wide) >> 8; - - age = (now - oss->lqo_used) >> 3; - if (lov->lov_qos.lq_reset || age > 32 * lov->desc.ld_qos_maxage) - oss->lqo_penalty = 0; - else if (age > lov->desc.ld_qos_maxage) - /* Decay the penalty by half for every 8x the update - * interval that the device has been idle. That gives - * lots of time for the statfs information to be - * updated (which the penalty is only a proxy for), - * and avoids penalizing OSS/OSTs under light load. */ - oss->lqo_penalty >>= (age / lov->desc.ld_qos_maxage); - } - - lov->lov_qos.lq_dirty = 0; - lov->lov_qos.lq_reset = 0; - - /* If each ost has almost same free space, - * do rr allocation for better creation performance */ - lov->lov_qos.lq_same_space = 0; - if ((ba_max * (256 - lov->lov_qos.lq_threshold_rr)) >> 8 < ba_min) { - lov->lov_qos.lq_same_space = 1; - /* Reset weights for the next time we enter qos mode */ - lov->lov_qos.lq_reset = 1; - } - rc = 0; - -out: - if (!rc && lov->lov_qos.lq_same_space) - RETURN(-EAGAIN); - RETURN(rc); -} - -static int qos_calc_weight(struct lov_obd *lov, int i) -{ - __u64 temp, temp2; - - /* Final ost weight = TGT_BAVAIL - ost_penalty - oss_penalty */ - temp = TGT_BAVAIL(i); - temp2 = lov->lov_tgts[i]->ltd_qos.ltq_penalty + - lov->lov_tgts[i]->ltd_qos.ltq_oss->lqo_penalty; - if (temp < temp2) - lov->lov_tgts[i]->ltd_qos.ltq_weight = 0; - else - lov->lov_tgts[i]->ltd_qos.ltq_weight = temp - temp2; - return 0; -} - -/* We just used this index for a stripe; adjust everyone's weights */ -static int qos_used(struct lov_obd *lov, struct ost_pool *osts, - __u32 index, __u64 *total_wt) -{ - struct lov_qos_oss *oss; - int j; - ENTRY; - - /* Don't allocate from this stripe anymore, until the next alloc_qos */ - lov->lov_tgts[index]->ltd_qos.ltq_usable = 0; - - oss = lov->lov_tgts[index]->ltd_qos.ltq_oss; - - /* Decay old penalty by half (we're adding max penalty, and don't - want it to run away.) */ - lov->lov_tgts[index]->ltd_qos.ltq_penalty >>= 1; - oss->lqo_penalty >>= 1; - - /* mark the OSS and OST as recently used */ - lov->lov_tgts[index]->ltd_qos.ltq_used = - oss->lqo_used = cfs_time_current_sec(); - - /* Set max penalties for this OST and OSS */ - lov->lov_tgts[index]->ltd_qos.ltq_penalty += - lov->lov_tgts[index]->ltd_qos.ltq_penalty_per_obj * - lov->desc.ld_active_tgt_count; - oss->lqo_penalty += oss->lqo_penalty_per_obj * - lov->lov_qos.lq_active_oss_count; - - /* Decrease all OSS penalties */ - cfs_list_for_each_entry(oss, &lov->lov_qos.lq_oss_list, lqo_oss_list) { - if (oss->lqo_penalty < oss->lqo_penalty_per_obj) - oss->lqo_penalty = 0; - else - oss->lqo_penalty -= oss->lqo_penalty_per_obj; - } - - *total_wt = 0; - /* Decrease all OST penalties */ - for (j = 0; j < osts->op_count; j++) { - int i; - - i = osts->op_array[j]; - if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) - continue; - if (lov->lov_tgts[i]->ltd_qos.ltq_penalty < - lov->lov_tgts[i]->ltd_qos.ltq_penalty_per_obj) - lov->lov_tgts[i]->ltd_qos.ltq_penalty = 0; - else - lov->lov_tgts[i]->ltd_qos.ltq_penalty -= - lov->lov_tgts[i]->ltd_qos.ltq_penalty_per_obj; - - qos_calc_weight(lov, i); - - /* Recalc the total weight of usable osts */ - if (lov->lov_tgts[i]->ltd_qos.ltq_usable) - *total_wt += lov->lov_tgts[i]->ltd_qos.ltq_weight; - -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "recalc tgt %d usable=%d avail="LPU64 - " ostppo="LPU64" ostp="LPU64" ossppo="LPU64 - " ossp="LPU64" wt="LPU64"\n", - i, lov->lov_tgts[i]->ltd_qos.ltq_usable, - TGT_BAVAIL(i) >> 10, - lov->lov_tgts[i]->ltd_qos.ltq_penalty_per_obj >> 10, - lov->lov_tgts[i]->ltd_qos.ltq_penalty >> 10, - lov->lov_tgts[i]->ltd_qos.ltq_oss->lqo_penalty_per_obj>>10, - lov->lov_tgts[i]->ltd_qos.ltq_oss->lqo_penalty >> 10, - lov->lov_tgts[i]->ltd_qos.ltq_weight >> 10); -#endif - } - - RETURN(0); -} - -#define LOV_QOS_EMPTY ((__u32)-1) -/* compute optimal round-robin order, based on OSTs per OSS */ -static int qos_calc_rr(struct lov_obd *lov, struct ost_pool *src_pool, - struct lov_qos_rr *lqr) -{ - struct lov_qos_oss *oss; - unsigned placed, real_count; - int i, rc; - ENTRY; - - if (!lqr->lqr_dirty) { - LASSERT(lqr->lqr_pool.op_size); - RETURN(0); - } - - /* Do actual allocation. */ - cfs_down_write(&lov->lov_qos.lq_rw_sem); - - /* - * Check again. While we were sleeping on @lq_rw_sem something could - * change. - */ - if (!lqr->lqr_dirty) { - LASSERT(lqr->lqr_pool.op_size); - cfs_up_write(&lov->lov_qos.lq_rw_sem); - RETURN(0); - } - - real_count = src_pool->op_count; - - /* Zero the pool array */ - /* alloc_rr is holding a read lock on the pool, so nobody is adding/ - deleting from the pool. The lq_rw_sem insures that nobody else - is reading. */ - lqr->lqr_pool.op_count = real_count; - rc = lov_ost_pool_extend(&lqr->lqr_pool, real_count); - if (rc) { - cfs_up_write(&lov->lov_qos.lq_rw_sem); - RETURN(rc); - } - for (i = 0; i < lqr->lqr_pool.op_count; i++) - lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY; - - /* Place all the OSTs from 1 OSS at the same time. */ - placed = 0; - cfs_list_for_each_entry(oss, &lov->lov_qos.lq_oss_list, lqo_oss_list) { - int j = 0; - for (i = 0; i < lqr->lqr_pool.op_count; i++) { - if (lov->lov_tgts[src_pool->op_array[i]] && - (lov->lov_tgts[src_pool->op_array[i]]->ltd_qos.ltq_oss == oss)) { - /* Evenly space these OSTs across arrayspace */ - int next = j * lqr->lqr_pool.op_count / oss->lqo_ost_count; - while (lqr->lqr_pool.op_array[next] != - LOV_QOS_EMPTY) - next = (next + 1) % lqr->lqr_pool.op_count; - lqr->lqr_pool.op_array[next] = src_pool->op_array[i]; - j++; - placed++; - } - } - } - - lqr->lqr_dirty = 0; - cfs_up_write(&lov->lov_qos.lq_rw_sem); - - if (placed != real_count) { - /* This should never happen */ - LCONSOLE_ERROR_MSG(0x14e, "Failed to place all OSTs in the " - "round-robin list (%d of %d).\n", - placed, real_count); - for (i = 0; i < lqr->lqr_pool.op_count; i++) { - LCONSOLE(D_WARNING, "rr #%d ost idx=%d\n", i, - lqr->lqr_pool.op_array[i]); - } - lqr->lqr_dirty = 1; - RETURN(-EAGAIN); - } - -#ifdef QOS_DEBUG - for (i = 0; i < lqr->lqr_pool.op_count; i++) { - LCONSOLE(D_QOS, "rr #%d ost idx=%d\n", i, - lqr->lqr_pool.op_array[i]); - } -#endif - - RETURN(0); -} - - -void qos_shrink_lsm(struct lov_request_set *set) -{ - struct lov_stripe_md *lsm = set->set_oi->oi_md, *lsm_new; - /* XXX LOV STACKING call into osc for sizes */ - unsigned oldsize, newsize; - - if (set->set_oti && set->set_cookies && set->set_cookie_sent) { - struct llog_cookie *cookies; - oldsize = lsm->lsm_stripe_count * sizeof(*cookies); - newsize = set->set_count * sizeof(*cookies); - - cookies = set->set_cookies; - oti_alloc_cookies(set->set_oti, set->set_count); - if (set->set_oti->oti_logcookies) { - memcpy(set->set_oti->oti_logcookies, cookies, newsize); - OBD_FREE_LARGE(cookies, oldsize); - set->set_cookies = set->set_oti->oti_logcookies; - } else { - CWARN("'leaking' %d bytes\n", oldsize - newsize); - } - } - - CWARN("using fewer stripes for object "LPU64": old %u new %u\n", - lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count); - LASSERT(lsm->lsm_stripe_count >= set->set_count); - - newsize = lov_stripe_md_size(set->set_count); - OBD_ALLOC_LARGE(lsm_new, newsize); - if (lsm_new != NULL) { - int i; - memcpy(lsm_new, lsm, sizeof(*lsm)); - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (i < set->set_count) { - lsm_new->lsm_oinfo[i] = lsm->lsm_oinfo[i]; - continue; - } - OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab, - sizeof(struct lov_oinfo)); - } - lsm_new->lsm_stripe_count = set->set_count; - OBD_FREE_LARGE(lsm, sizeof(struct lov_stripe_md) + - lsm->lsm_stripe_count*sizeof(struct lov_oinfo*)); - set->set_oi->oi_md = lsm_new; - } else { - CWARN("'leaking' few bytes\n"); - } -} - -/** - * Check whether we can create the object on the OST(refered by ost_idx) - * \retval: - * 0: create the object. - * other value: did not create the object. - */ -static int lov_check_and_create_object(struct lov_obd *lov, int ost_idx, - struct lov_stripe_md *lsm, - struct lov_request *req, - struct obd_trans_info *oti) -{ - __u16 stripe; - int rc = -EIO; - ENTRY; - - CDEBUG(D_QOS, "Check and create on idx %d \n", ost_idx); - if (!lov->lov_tgts[ost_idx] || - !lov->lov_tgts[ost_idx]->ltd_active) - RETURN(rc); - - /* check if objects has been created on this ost */ - for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) { - /* already have object at this stripe */ - if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx) - break; - } - - if (stripe >= lsm->lsm_stripe_count) { - req->rq_idx = ost_idx; - rc = obd_create(NULL, lov->lov_tgts[ost_idx]->ltd_exp, - req->rq_oi.oi_oa, &req->rq_oi.oi_md, - oti); - } - RETURN(rc); -} - -int qos_remedy_create(struct lov_request_set *set, struct lov_request *req) -{ - struct lov_stripe_md *lsm = set->set_oi->oi_md; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - unsigned ost_idx; - unsigned ost_count; - struct pool_desc *pool; - struct ost_pool *osts = NULL; - int i; - int rc = -EIO; - ENTRY; - - /* First check whether we can create the objects on the pool */ - /* In the function below, .hs_keycmp resolves to - * pool_hashkey_keycmp() */ - /* coverity[overrun-buffer-val] */ - pool = lov_find_pool(lov, lsm->lsm_pool_name); - if (pool != NULL) { - cfs_down_read(&pool_tgt_rw_sem(pool)); - osts = &(pool->pool_obds); - ost_count = osts->op_count; - for (i = 0, ost_idx = osts->op_array[0]; i < ost_count; - i++, ost_idx = osts->op_array[i]) { - rc = lov_check_and_create_object(lov, ost_idx, lsm, req, - set->set_oti); - if (rc == 0) - break; - } - cfs_up_read(&pool_tgt_rw_sem(pool)); - lov_pool_putref(pool); - RETURN(rc); - } - - ost_count = lov->desc.ld_tgt_count; - /* Then check whether we can create the objects on other OSTs */ - ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count; - for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) { - rc = lov_check_and_create_object(lov, ost_idx, lsm, req, - set->set_oti); - - if (rc == 0) - break; - } - - RETURN(rc); -} - -static int min_stripe_count(int stripe_cnt, int flags) -{ - return (flags & LOV_USES_DEFAULT_STRIPE ? - stripe_cnt - (stripe_cnt / 4) : stripe_cnt); -} - -#define LOV_CREATE_RESEED_MULT 30 -#define LOV_CREATE_RESEED_MIN 2000 -/* Allocate objects on osts with round-robin algorithm */ -static int alloc_rr(struct lov_obd *lov, int *idx_arr, int *stripe_cnt, - char *poolname, int flags) -{ - unsigned array_idx; - int i, rc, *idx_pos; - __u32 ost_idx; - int ost_start_idx_temp; - int speed = 0; - int stripe_cnt_min = min_stripe_count(*stripe_cnt, flags); - struct pool_desc *pool; - struct ost_pool *osts; - struct lov_qos_rr *lqr; - ENTRY; - - pool = lov_find_pool(lov, poolname); - if (pool == NULL) { - osts = &(lov->lov_packed); - lqr = &(lov->lov_qos.lq_rr); - } else { - cfs_down_read(&pool_tgt_rw_sem(pool)); - osts = &(pool->pool_obds); - lqr = &(pool->pool_rr); - } - - rc = qos_calc_rr(lov, osts, lqr); - if (rc) - GOTO(out, rc); - - if (--lqr->lqr_start_count <= 0) { - lqr->lqr_start_idx = cfs_rand() % osts->op_count; - lqr->lqr_start_count = - (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) + - LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U); - } else if (stripe_cnt_min >= osts->op_count || - lqr->lqr_start_idx > osts->op_count) { - /* If we have allocated from all of the OSTs, slowly - * precess the next start if the OST/stripe count isn't - * already doing this for us. */ - lqr->lqr_start_idx %= osts->op_count; - if (*stripe_cnt > 1 && (osts->op_count % (*stripe_cnt)) != 1) - ++lqr->lqr_offset_idx; - } - cfs_down_read(&lov->lov_qos.lq_rw_sem); - ost_start_idx_temp = lqr->lqr_start_idx; - -repeat_find: - array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) % osts->op_count; - idx_pos = idx_arr; -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "pool '%s' want %d startidx %d startcnt %d offset %d " - "active %d count %d arrayidx %d\n", poolname, - *stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count, - lqr->lqr_offset_idx, osts->op_count, osts->op_count, array_idx); -#endif - - for (i = 0; i < osts->op_count; - i++, array_idx=(array_idx + 1) % osts->op_count) { - ++lqr->lqr_start_idx; - ost_idx = lqr->lqr_pool.op_array[array_idx]; -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "#%d strt %d act %d strp %d ary %d idx %d\n", - i, lqr->lqr_start_idx, - ((ost_idx != LOV_QOS_EMPTY) && lov->lov_tgts[ost_idx]) ? - lov->lov_tgts[ost_idx]->ltd_active : 0, - idx_pos - idx_arr, array_idx, ost_idx); -#endif - if ((ost_idx == LOV_QOS_EMPTY) || !lov->lov_tgts[ost_idx] || - !lov->lov_tgts[ost_idx]->ltd_active) - continue; - - /* Fail Check before osc_precreate() is called - so we can only 'fail' single OSC. */ - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0) - continue; - - /* Drop slow OSCs if we can */ - if (obd_precreate(lov->lov_tgts[ost_idx]->ltd_exp) > speed) - continue; - - *idx_pos = ost_idx; - idx_pos++; - /* We have enough stripes */ - if (idx_pos - idx_arr == *stripe_cnt) - break; - } - if ((speed < 2) && (idx_pos - idx_arr < stripe_cnt_min)) { - /* Try again, allowing slower OSCs */ - speed++; - lqr->lqr_start_idx = ost_start_idx_temp; - goto repeat_find; - } - - cfs_up_read(&lov->lov_qos.lq_rw_sem); - - *stripe_cnt = idx_pos - idx_arr; -out: - if (pool != NULL) { - cfs_up_read(&pool_tgt_rw_sem(pool)); - /* put back ref got by lov_find_pool() */ - lov_pool_putref(pool); - } - - RETURN(rc); -} - -/* alloc objects on osts with specific stripe offset */ -static int alloc_specific(struct lov_obd *lov, struct lov_stripe_md *lsm, - int *idx_arr) -{ - unsigned ost_idx, array_idx, ost_count; - int i, rc, *idx_pos; - int speed = 0; - struct pool_desc *pool; - struct ost_pool *osts; - ENTRY; - - /* In the function below, .hs_keycmp resolves to - * pool_hashkey_keycmp() */ - /* coverity[overrun-buffer-val] */ - pool = lov_find_pool(lov, lsm->lsm_pool_name); - if (pool == NULL) { - osts = &(lov->lov_packed); - } else { - cfs_down_read(&pool_tgt_rw_sem(pool)); - osts = &(pool->pool_obds); - } - - ost_count = osts->op_count; - -repeat_find: - /* search loi_ost_idx in ost array */ - array_idx = 0; - for (i = 0; i < ost_count; i++) { - if (osts->op_array[i] == lsm->lsm_oinfo[0]->loi_ost_idx) { - array_idx = i; - break; - } - } - if (i == ost_count) { - CERROR("Start index %d not found in pool '%s'\n", - lsm->lsm_oinfo[0]->loi_ost_idx, lsm->lsm_pool_name); - GOTO(out, rc = -EINVAL); - } - - idx_pos = idx_arr; - for (i = 0; i < ost_count; - i++, array_idx = (array_idx + 1) % ost_count) { - ost_idx = osts->op_array[array_idx]; - - if (!lov->lov_tgts[ost_idx] || - !lov->lov_tgts[ost_idx]->ltd_active) { - continue; - } - - /* Fail Check before osc_precreate() is called - so we can only 'fail' single OSC. */ - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0) - continue; - - /* Drop slow OSCs if we can, but not for requested start idx. - * - * This means "if OSC is slow and it is not the requested - * start OST, then it can be skipped, otherwise skip it only - * if it is inactive/recovering/out-of-space." */ - if ((obd_precreate(lov->lov_tgts[ost_idx]->ltd_exp) > speed) && - (i != 0 || speed >= 2)) - continue; - - *idx_pos = ost_idx; - idx_pos++; - /* We have enough stripes */ - if (idx_pos - idx_arr == lsm->lsm_stripe_count) - GOTO(out, rc = 0); - } - if (speed < 2) { - /* Try again, allowing slower OSCs */ - speed++; - goto repeat_find; - } - - /* If we were passed specific striping params, then a failure to - * meet those requirements is an error, since we can't reallocate - * that memory (it might be part of a larger array or something). - * - * We can only get here if lsm_stripe_count was originally > 1. - */ - CERROR("can't lstripe objid "LPX64": have %d want %u\n", - lsm->lsm_object_id, (int)(idx_pos - idx_arr), - lsm->lsm_stripe_count); - rc = -EFBIG; -out: - if (pool != NULL) { - cfs_up_read(&pool_tgt_rw_sem(pool)); - /* put back ref got by lov_find_pool() */ - lov_pool_putref(pool); - } - - RETURN(rc); -} - -/* Alloc objects on osts with optimization based on: - - free space - - network resources (shared OSS's) -*/ -static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt, - char *poolname, int flags) -{ - struct lov_obd *lov = &exp->exp_obd->u.lov; - __u64 total_weight = 0; - int nfound, good_osts, i, rc = 0; - int stripe_cnt_min = min_stripe_count(*stripe_cnt, flags); - struct pool_desc *pool; - struct ost_pool *osts; - ENTRY; - - if (stripe_cnt_min < 1) - RETURN(-EINVAL); - - pool = lov_find_pool(lov, poolname); - if (pool == NULL) { - osts = &(lov->lov_packed); - } else { - cfs_down_read(&pool_tgt_rw_sem(pool)); - osts = &(pool->pool_obds); - } - - obd_getref(exp->exp_obd); - - /* wait for fresh statfs info if needed, the rpcs are sent in - * lov_create() */ - qos_statfs_update(exp->exp_obd, - cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1); - - /* Detect -EAGAIN early, before expensive lock is taken. */ - if (!lov->lov_qos.lq_dirty && lov->lov_qos.lq_same_space) - GOTO(out_nolock, rc = -EAGAIN); - - /* Do actual allocation, use write lock here. */ - cfs_down_write(&lov->lov_qos.lq_rw_sem); - - /* - * Check again, while we were sleeping on @lq_rw_sem things could - * change. - */ - if (!lov->lov_qos.lq_dirty && lov->lov_qos.lq_same_space) - GOTO(out, rc = -EAGAIN); - - if (lov->desc.ld_active_tgt_count < 2) - GOTO(out, rc = -EAGAIN); - - rc = qos_calc_ppo(exp->exp_obd); - if (rc) - GOTO(out, rc); - - good_osts = 0; - /* Find all the OSTs that are valid stripe candidates */ - for (i = 0; i < osts->op_count; i++) { - if (!lov->lov_tgts[osts->op_array[i]] || - !lov->lov_tgts[osts->op_array[i]]->ltd_active) - continue; - - /* Fail Check before osc_precreate() is called - so we can only 'fail' single OSC. */ - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && osts->op_array[i] == 0) - continue; - - if (obd_precreate(lov->lov_tgts[osts->op_array[i]]->ltd_exp) > 2) - continue; - - lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_usable = 1; - qos_calc_weight(lov, osts->op_array[i]); - total_weight += lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_weight; - - good_osts++; - } - -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "found %d good osts\n", good_osts); -#endif - - if (good_osts < stripe_cnt_min) - GOTO(out, rc = -EAGAIN); - - /* We have enough osts */ - if (good_osts < *stripe_cnt) - *stripe_cnt = good_osts; - - if (!*stripe_cnt) - GOTO(out, rc = -EAGAIN); - - /* Find enough OSTs with weighted random allocation. */ - nfound = 0; - while (nfound < *stripe_cnt) { - __u64 rand, cur_weight; - - cur_weight = 0; - rc = -ENODEV; - - if (total_weight) { -#if BITS_PER_LONG == 32 - rand = cfs_rand() % (unsigned)total_weight; - /* If total_weight > 32-bit, first generate the high - * 32 bits of the random number, then add in the low - * 32 bits (truncated to the upper limit, if needed) */ - if (total_weight > 0xffffffffULL) - rand = (__u64)(cfs_rand() % - (unsigned)(total_weight >> 32)) << 32; - else - rand = 0; - - if (rand == (total_weight & 0xffffffff00000000ULL)) - rand |= cfs_rand() % (unsigned)total_weight; - else - rand |= cfs_rand(); - -#else - rand = ((__u64)cfs_rand() << 32 | cfs_rand()) % - total_weight; -#endif - } else { - rand = 0; - } - - /* On average, this will hit larger-weighted osts more often. - 0-weight osts will always get used last (only when rand=0).*/ - for (i = 0; i < osts->op_count; i++) { - if (!lov->lov_tgts[osts->op_array[i]] || - !lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_usable) - continue; - - cur_weight += lov->lov_tgts[osts->op_array[i]]->ltd_qos.ltq_weight; -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "stripe_cnt=%d nfound=%d cur_weight="LPU64 - " rand="LPU64" total_weight="LPU64"\n", - *stripe_cnt, nfound, cur_weight, rand, total_weight); -#endif - if (cur_weight >= rand) { -#ifdef QOS_DEBUG - CDEBUG(D_QOS, "assigned stripe=%d to idx=%d\n", - nfound, osts->op_array[i]); -#endif - idx_arr[nfound++] = osts->op_array[i]; - qos_used(lov, osts, osts->op_array[i], &total_weight); - rc = 0; - break; - } - } - /* should never satisfy below condition */ - if (rc) { - CERROR("Didn't find any OSTs?\n"); - break; - } - } - LASSERT(nfound == *stripe_cnt); - -out: - cfs_up_write(&lov->lov_qos.lq_rw_sem); - -out_nolock: - if (pool != NULL) { - cfs_up_read(&pool_tgt_rw_sem(pool)); - /* put back ref got by lov_find_pool() */ - lov_pool_putref(pool); - } - - if (rc == -EAGAIN) - rc = alloc_rr(lov, idx_arr, stripe_cnt, poolname, flags); - - obd_putref(exp->exp_obd); - RETURN(rc); -} - -/* return new alloced stripe count on success */ -static int alloc_idx_array(struct obd_export *exp, struct lov_stripe_md *lsm, - int newea, int **idx_arr, int *arr_cnt, int flags) -{ - struct lov_obd *lov = &exp->exp_obd->u.lov; - int stripe_cnt = lsm->lsm_stripe_count; - int i, rc = 0; - int *tmp_arr = NULL; - ENTRY; - - *arr_cnt = stripe_cnt; - OBD_ALLOC(tmp_arr, *arr_cnt * sizeof(int)); - if (tmp_arr == NULL) - RETURN(-ENOMEM); - for (i = 0; i < *arr_cnt; i++) - tmp_arr[i] = -1; - - if (newea || - lsm->lsm_oinfo[0]->loi_ost_idx >= lov->desc.ld_tgt_count) - /* In the function below, .hs_keycmp resolves to - * pool_hashkey_keycmp() */ - /* coverity[overrun-buffer-val] */ - rc = alloc_qos(exp, tmp_arr, &stripe_cnt, - lsm->lsm_pool_name, flags); - else - rc = alloc_specific(lov, lsm, tmp_arr); - - if (rc) - GOTO(out_arr, rc); - - *idx_arr = tmp_arr; - RETURN(stripe_cnt); -out_arr: - OBD_FREE(tmp_arr, *arr_cnt * sizeof(int)); - *arr_cnt = 0; - RETURN(rc); -} - -static void free_idx_array(int *idx_arr, int arr_cnt) -{ - if (arr_cnt) - OBD_FREE(idx_arr, arr_cnt * sizeof(int)); -} - -int qos_prep_create(struct obd_export *exp, struct lov_request_set *set) -{ - struct lov_obd *lov = &exp->exp_obd->u.lov; - struct lov_stripe_md *lsm; - struct obdo *src_oa = set->set_oi->oi_oa; - struct obd_trans_info *oti = set->set_oti; - int i, stripes, rc = 0, newea = 0; - int flag = LOV_USES_ASSIGNED_STRIPE; - int *idx_arr = NULL, idx_cnt = 0; - ENTRY; - - LASSERT(src_oa->o_valid & OBD_MD_FLID); - LASSERT(src_oa->o_valid & OBD_MD_FLGROUP); - - if (set->set_oi->oi_md == NULL) { - __u16 stripes_def = lov_get_stripecnt(lov, LOV_MAGIC, 0); - - /* If the MDS file was truncated up to some size, stripe over - * enough OSTs to allow the file to be created at that size. - * This may mean we use more than the default # of stripes. */ - if (src_oa->o_valid & OBD_MD_FLSIZE) { - obd_size min_bavail = LUSTRE_STRIPE_MAXBYTES; - - /* Find a small number of stripes we can use - (up to # of active osts). */ - stripes = 1; - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - if (!lov->lov_tgts[i] || - !lov->lov_tgts[i]->ltd_active) - continue; - min_bavail = min(min_bavail, TGT_BAVAIL(i)); - if (min_bavail * stripes > src_oa->o_size) - break; - stripes++; - } - - if (stripes < stripes_def) - stripes = stripes_def; - } else { - flag = LOV_USES_DEFAULT_STRIPE; - stripes = stripes_def; - } - - rc = lov_alloc_memmd(&set->set_oi->oi_md, stripes, - lov->desc.ld_pattern ? - lov->desc.ld_pattern : LOV_PATTERN_RAID0, - LOV_MAGIC); - if (rc < 0) - GOTO(out_err, rc); - newea = 1; - rc = 0; - } - - lsm = set->set_oi->oi_md; - lsm->lsm_object_id = src_oa->o_id; - lsm->lsm_object_seq = src_oa->o_seq; - lsm->lsm_layout_gen = 0; /* actual generation set in mdd_lov_create() */ - - if (!lsm->lsm_stripe_size) - lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; - if (!lsm->lsm_pattern) { - LASSERT(lov->desc.ld_pattern); - lsm->lsm_pattern = lov->desc.ld_pattern; - } - - stripes = alloc_idx_array(exp, lsm, newea, &idx_arr, &idx_cnt, flag); - if (stripes <= 0) - GOTO(out_err, rc = stripes ? stripes : -EIO); - LASSERTF(stripes <= lsm->lsm_stripe_count,"requested %d allocated %d\n", - lsm->lsm_stripe_count, stripes); - - for (i = 0; i < stripes; i++) { - struct lov_request *req; - int ost_idx = idx_arr[i]; - LASSERT(ost_idx >= 0); - - OBD_ALLOC(req, sizeof(*req)); - if (req == NULL) - GOTO(out_err, rc = -ENOMEM); - lov_set_add_req(req, set); - - req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); - if (req->rq_oi.oi_md == NULL) - GOTO(out_err, rc = -ENOMEM); - - OBDO_ALLOC(req->rq_oi.oi_oa); - if (req->rq_oi.oi_oa == NULL) - GOTO(out_err, rc = -ENOMEM); - - req->rq_idx = ost_idx; - req->rq_stripe = i; - /* create data objects with "parent" OA */ - memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); - req->rq_oi.oi_cb_up = cb_create_update; - - /* XXX When we start creating objects on demand, we need to - * make sure that we always create the object on the - * stripe which holds the existing file size. - */ - if (src_oa->o_valid & OBD_MD_FLSIZE) { - req->rq_oi.oi_oa->o_size = - lov_size_to_stripe(lsm, src_oa->o_size, i); - - CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", - i, req->rq_oi.oi_oa->o_size, src_oa->o_size); - } - } - LASSERT(set->set_count == stripes); - - if (stripes < lsm->lsm_stripe_count) - qos_shrink_lsm(set); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LOV_PREP_CREATE)) { - qos_shrink_lsm(set); - rc = -EIO; - } - - if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) { - oti_alloc_cookies(oti, set->set_count); - if (!oti->oti_logcookies) - GOTO(out_err, rc = -ENOMEM); - set->set_cookies = oti->oti_logcookies; - } -out_err: - if (newea && rc) - obd_free_memmd(exp, &set->set_oi->oi_md); - if (idx_arr) - free_idx_array(idx_arr, idx_cnt); - EXIT; - return rc; -} - -void qos_update(struct lov_obd *lov) -{ - ENTRY; - lov->lov_qos.lq_dirty = 1; -} - -void qos_statfs_done(struct lov_obd *lov) -{ - cfs_down_write(&lov->lov_qos.lq_rw_sem); - if (lov->lov_qos.lq_statfs_in_progress) { - lov->lov_qos.lq_statfs_in_progress = 0; - /* wake up any threads waiting for the statfs rpcs to complete*/ - cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); - } - cfs_up_write(&lov->lov_qos.lq_rw_sem); -} - -static int qos_statfs_ready(struct obd_device *obd, __u64 max_age) -{ - struct lov_obd *lov = &obd->u.lov; - int rc; - ENTRY; - cfs_down_read(&lov->lov_qos.lq_rw_sem); - rc = lov->lov_qos.lq_statfs_in_progress == 0 || - cfs_time_beforeq_64(max_age, obd->obd_osfs_age); - cfs_up_read(&lov->lov_qos.lq_rw_sem); - RETURN(rc); -} - -/* - * Update statfs data if the current osfs age is older than max_age. - * If wait is not set, it means that we are called from lov_create() - * and we should just issue the rpcs without waiting for them to complete. - * If wait is set, we are called from alloc_qos() and we just have - * to wait for the request set to complete. - */ -void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait) -{ - struct lov_obd *lov = &obd->u.lov; - struct obd_info *oinfo; - int rc = 0; - struct ptlrpc_request_set *set = NULL; - ENTRY; - - if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age)) - /* statfs data are quite recent, don't need to refresh it */ - RETURN_EXIT; - - if (!wait && lov->lov_qos.lq_statfs_in_progress) - /* statfs already in progress */ - RETURN_EXIT; - - cfs_down_write(&lov->lov_qos.lq_rw_sem); - if (lov->lov_qos.lq_statfs_in_progress) { - cfs_up_write(&lov->lov_qos.lq_rw_sem); - GOTO(out, rc = 0); - } - /* no statfs in flight, send rpcs */ - lov->lov_qos.lq_statfs_in_progress = 1; - cfs_up_write(&lov->lov_qos.lq_rw_sem); - - if (wait) - CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data " - "in a timely manner (osfs age "LPU64", max age "LPU64")" - ", sending new statfs rpcs\n", - obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age, - max_age); - - /* need to send statfs rpcs */ - CDEBUG(D_QOS, "sending new statfs requests\n"); - memset(lov->lov_qos.lq_statfs_data, 0, - sizeof(*lov->lov_qos.lq_statfs_data)); - oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi; - oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs; - oinfo->oi_flags = OBD_STATFS_NODELAY; - set = ptlrpc_prep_set(); - if (!set) - GOTO(out_failed, rc = -ENOMEM); - - rc = obd_statfs_async(obd->obd_self_export, oinfo, max_age, set); - if (rc || cfs_list_empty(&set->set_requests)) { - if (rc) - CWARN("statfs failed with %d\n", rc); - GOTO(out_failed, rc); - } - /* send requests via ptlrpcd */ - oinfo->oi_flags |= OBD_STATFS_PTLRPCD; - ptlrpcd_add_rqset(set); - GOTO(out, rc); - -out_failed: - cfs_down_write(&lov->lov_qos.lq_rw_sem); - lov->lov_qos.lq_statfs_in_progress = 0; - /* wake up any threads waiting for the statfs rpcs to complete */ - cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); - cfs_up_write(&lov->lov_qos.lq_rw_sem); - wait = 0; -out: - if (set) - ptlrpc_set_destroy(set); - if (wait) { - struct l_wait_info lwi = { 0 }; - CDEBUG(D_QOS, "waiting for statfs requests to complete\n"); - l_wait_event(lov->lov_qos.lq_statfs_waitq, - qos_statfs_ready(obd, max_age), &lwi); - if (cfs_time_before_64(obd->obd_osfs_age, max_age)) - CDEBUG(D_QOS, "%s: still no fresh statfs data after " - "waiting (osfs age "LPU64", max age " - LPU64")\n", - obd_uuid2str(&lov->desc.ld_uuid), - obd->obd_osfs_age, max_age); - } -} diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index e049e18..c972437 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -572,221 +572,6 @@ out_set: lov_fini_cancel_set(set); RETURN(rc); } - -static int lov_update_create_set(struct lov_request_set *set, - struct lov_request *req, int rc) -{ - struct obd_trans_info *oti = set->set_oti; - struct lov_stripe_md *lsm = set->set_oi->oi_md; - struct lov_oinfo *loi; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - ENTRY; - - if (rc && lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - /* Pre-creating objects may timeout via -ETIMEDOUT or - * -ENOTCONN both are always non-critical events. */ - CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR, - "error creating fid "LPX64" sub-object " - "on OST idx %d/%d: rc = %d\n", - set->set_oi->oi_oa->o_id, req->rq_idx, - lsm->lsm_stripe_count, rc); - if (rc > 0) { - CERROR("obd_create returned invalid err %d\n", rc); - rc = -EIO; - } - } - - cfs_spin_lock(&set->set_lock); - req->rq_stripe = cfs_atomic_read(&set->set_success); - loi = lsm->lsm_oinfo[req->rq_stripe]; - - - if (rc) { - lov_update_set(set, req, rc); - cfs_spin_unlock(&set->set_lock); - RETURN(rc); - } - - loi->loi_id = req->rq_oi.oi_oa->o_id; - loi->loi_seq = req->rq_oi.oi_oa->o_seq; - loi->loi_ost_idx = req->rq_idx; - loi_init(loi); - - if (oti && set->set_cookies) - ++oti->oti_logcookies; - if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) - set->set_cookie_sent++; - - lov_update_set(set, req, rc); - cfs_spin_unlock(&set->set_lock); - - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", - lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); - RETURN(rc); -} - -static int create_done(struct obd_export *exp, struct lov_request_set *set, - struct lov_stripe_md **lsmp) -{ - struct lov_obd *lov = &exp->exp_obd->u.lov; - struct obd_trans_info *oti = set->set_oti; - struct obdo *src_oa = set->set_oi->oi_oa; - struct lov_request *req; - struct obdo *ret_oa = NULL; - int success, attrset = 0, rc = 0; - ENTRY; - - LASSERT(cfs_atomic_read(&set->set_completes)); - - /* try alloc objects on other osts if osc_create fails for - * exceptions: RPC failure, ENOSPC, etc */ - if (set->set_count != cfs_atomic_read(&set->set_success)) { - cfs_list_for_each_entry (req, &set->set_list, rq_link) { - if (req->rq_rc == 0) - continue; - - cfs_atomic_dec(&set->set_completes); - req->rq_complete = 0; - - rc = qos_remedy_create(set, req); - lov_update_create_set(set, req, rc); - } - } - - success = cfs_atomic_read(&set->set_success); - /* no successful creates */ - if (success == 0) - GOTO(cleanup, rc); - - if (set->set_count != success) { - set->set_count = success; - qos_shrink_lsm(set); - } - - OBDO_ALLOC(ret_oa); - if (ret_oa == NULL) - GOTO(cleanup, rc = -ENOMEM); - - cfs_list_for_each_entry(req, &set->set_list, rq_link) { - if (!req->rq_complete || req->rq_rc) - continue; - lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, - req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md, - req->rq_stripe, &attrset); - } - if (src_oa->o_valid & OBD_MD_FLSIZE && - ret_oa->o_size != src_oa->o_size) { - CERROR("original size "LPU64" isn't new object size "LPU64"\n", - src_oa->o_size, ret_oa->o_size); - LBUG(); - } - ret_oa->o_id = src_oa->o_id; - ret_oa->o_seq = src_oa->o_seq; - ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; - memcpy(src_oa, ret_oa, sizeof(*src_oa)); - OBDO_FREE(ret_oa); - - *lsmp = set->set_oi->oi_md; - GOTO(done, rc = 0); - -cleanup: - cfs_list_for_each_entry(req, &set->set_list, rq_link) { - struct obd_export *sub_exp; - int err = 0; - - if (!req->rq_complete || req->rq_rc) - continue; - - sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; - err = obd_destroy(NULL, sub_exp, req->rq_oi.oi_oa, NULL, oti, - NULL, NULL); - if (err) - CERROR("Failed to uncreate objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - src_oa->o_id, req->rq_oi.oi_oa->o_id, - req->rq_idx, rc); - } - if (*lsmp == NULL) - obd_free_memmd(exp, &set->set_oi->oi_md); -done: - if (oti && set->set_cookies) { - oti->oti_logcookies = set->set_cookies; - if (!set->set_cookie_sent) { - oti_free_cookies(oti); - src_oa->o_valid &= ~OBD_MD_FLCOOKIE; - } else { - src_oa->o_valid |= OBD_MD_FLCOOKIE; - } - } - RETURN(rc); -} - -int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) -{ - int rc = 0; - ENTRY; - - if (set == NULL) - RETURN(0); - LASSERT(set->set_exp); - if (cfs_atomic_read(&set->set_completes)) - rc = create_done(set->set_exp, set, lsmp); - - lov_put_reqset(set); - RETURN(rc); -} - -int cb_create_update(void *cookie, int rc) -{ - struct obd_info *oinfo = cookie; - struct lov_request *lovreq; - - lovreq = container_of(oinfo, struct lov_request, rq_oi); - - if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) - if (lovreq->rq_idx == cfs_fail_val) - rc = -ENOTCONN; - - rc = lov_update_create_set(lovreq->rq_rqset, lovreq, rc); - if (lov_set_finished(lovreq->rq_rqset, 0)) - lov_put_reqset(lovreq->rq_rqset); - return rc; -} - -int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, - struct lov_stripe_md **lsmp, struct obdo *src_oa, - struct obd_trans_info *oti, - struct lov_request_set **reqset) -{ - struct lov_request_set *set; - int rc = 0; - ENTRY; - - OBD_ALLOC(set, sizeof(*set)); - if (set == NULL) - RETURN(-ENOMEM); - lov_init_set(set); - - set->set_exp = exp; - set->set_oi = oinfo; - set->set_oi->oi_md = *lsmp; - set->set_oi->oi_oa = src_oa; - set->set_oti = oti; - lov_get_reqset(set); - - rc = qos_prep_create(exp, set); - /* qos_shrink_lsm() may have allocated a new lsm */ - *lsmp = oinfo->oi_md; - if (rc) { - lov_fini_create_set(set, lsmp); - lov_put_reqset(set); - } else { - *reqset = set; - } - RETURN(rc); -} - static int common_attr_done(struct lov_request_set *set) { cfs_list_t *pos; @@ -1670,7 +1455,6 @@ static int cb_statfs_update(void *cookie, int rc) out_update: lov_update_statfs(osfs, lov_sfs, success); - qos_update(lov); obd_putref(lovobd); out: @@ -1678,8 +1462,6 @@ out: lov_set_finished(set, 0)) { lov_statfs_interpret(NULL, set, set->set_count != cfs_atomic_read(&set->set_success)); - if (lov->lov_qos.lq_statfs_in_progress) - qos_statfs_done(lov); } RETURN(0); diff --git a/lustre/lov/lproc_lov.c b/lustre/lov/lproc_lov.c index 01687d9..318a060 100644 --- a/lustre/lov/lproc_lov.c +++ b/lustre/lov/lproc_lov.c @@ -200,106 +200,6 @@ static int lov_rd_desc_uuid(char *page, char **start, off_t off, int count, return snprintf(page, count, "%s\n", lov->desc.ld_uuid.uuid); } -/* free priority (0-255): how badly user wants to choose empty osts */ -static int lov_rd_qos_priofree(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *dev = (struct obd_device*) data; - struct lov_obd *lov; - - LASSERT(dev != NULL); - lov = &dev->u.lov; - *eof = 1; - return snprintf(page, count, "%d%%\n", - (lov->lov_qos.lq_prio_free * 100 + 255) >> 8); -} - -static int lov_wr_qos_priofree(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *dev = (struct obd_device *)data; - struct lov_obd *lov; - int val, rc; - LASSERT(dev != NULL); - - lov = &dev->u.lov; - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val > 100) - return -EINVAL; - lov->lov_qos.lq_prio_free = (val << 8) / 100; - lov->lov_qos.lq_dirty = 1; - lov->lov_qos.lq_reset = 1; - return count; -} - -static int lov_rd_qos_thresholdrr(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct obd_device *dev = (struct obd_device*) data; - struct lov_obd *lov; - - LASSERT(dev != NULL); - lov = &dev->u.lov; - *eof = 1; - return snprintf(page, count, "%d%%\n", - (lov->lov_qos.lq_threshold_rr * 100 + 255) >> 8); -} - -static int lov_wr_qos_thresholdrr(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *dev = (struct obd_device *)data; - struct lov_obd *lov; - int val, rc; - LASSERT(dev != NULL); - - lov = &dev->u.lov; - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val > 100 || val < 0) - return -EINVAL; - - lov->lov_qos.lq_threshold_rr = (val << 8) / 100; - lov->lov_qos.lq_dirty = 1; - return count; -} - -static int lov_rd_qos_maxage(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *dev = (struct obd_device*) data; - struct lov_obd *lov; - - LASSERT(dev != NULL); - lov = &dev->u.lov; - *eof = 1; - return snprintf(page, count, "%u Sec\n", lov->desc.ld_qos_maxage); -} - -static int lov_wr_qos_maxage(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *dev = (struct obd_device *)data; - struct lov_obd *lov; - int val, rc; - LASSERT(dev != NULL); - - lov = &dev->u.lov; - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val <= 0) - return -EINVAL; - lov->desc.ld_qos_maxage = val; - return count; -} - static void *lov_tgt_seq_start(struct seq_file *p, loff_t *pos) { struct obd_device *dev = p->private; @@ -378,9 +278,6 @@ struct lprocfs_vars lprocfs_lov_obd_vars[] = { { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "desc_uuid", lov_rd_desc_uuid, 0, 0 }, - { "qos_prio_free",lov_rd_qos_priofree, lov_wr_qos_priofree, 0 }, - { "qos_threshold_rr", lov_rd_qos_thresholdrr, lov_wr_qos_thresholdrr, 0 }, - { "qos_maxage", lov_rd_qos_maxage, lov_wr_qos_maxage, 0 }, { 0 } }; diff --git a/lustre/osc/Makefile.in b/lustre/osc/Makefile.in index 13a8517..b1128bc 100644 --- a/lustre/osc/Makefile.in +++ b/lustre/osc/Makefile.in @@ -1,5 +1,5 @@ MODULES := osc -osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o osc_cache.o +osc-objs := osc_request.o lproc_osc.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o osc_cache.o EXTRA_DIST = $(osc-objs:%.o=%.c) osc_internal.h osc_cl_internal.h diff --git a/lustre/osc/autoMakefile.am b/lustre/osc/autoMakefile.am index bc2da16..03ec19b 100644 --- a/lustre/osc/autoMakefile.am +++ b/lustre/osc/autoMakefile.am @@ -38,7 +38,7 @@ if LIBLUSTRE noinst_LIBRARIES = libosc.a -libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c osc_cache.c +libosc_a_SOURCES = osc_request.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c osc_cache.c libosc_a_CPPFLAGS = $(LLCPPFLAGS) libosc_a_CFLAGS = $(LLCFLAGS) @@ -54,7 +54,6 @@ if DARWIN macos_PROGRAMS = osc osc_SOURCES = \ - osc_create.c \ osc_dev.c \ osc_object.c \ osc_page.c \ diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index 5bf6332..05f13dd 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -305,149 +305,6 @@ static int osc_wr_grant_shrink_interval(struct file *file, const char *buffer, return count; } -static int osc_rd_create_count(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, "%d\n", - obd->u.cli.cl_oscc.oscc_grow_count); -} - -/** - * Set OSC creator's osc_creator::oscc_grow_count - * - * \param file proc file - * \param buffer buffer containing the value - * \param count buffer size - * \param data obd device - * - * \retval \a count - */ -static int osc_wr_create_count(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = data; - int val, rc, i; - - if (obd == NULL) - return 0; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - /* The MDT ALWAYS needs to limit the precreate count to - * OST_MAX_PRECREATE, and the constant cannot be changed - * because it is a value shared between the OSC and OST - * that is the maximum possible number of objects that will - * ever be handled by MDT->OST recovery processing. - * - * If the OST ever gets a request to delete more orphans, - * this implies that something has gone badly on the MDT - * and the OST will refuse to delete so much data from the - * filesystem as a safety measure. */ - if (val < OST_MIN_PRECREATE || val > OST_MAX_PRECREATE) - return -ERANGE; - if (val > obd->u.cli.cl_oscc.oscc_max_grow_count) - return -ERANGE; - - for (i = 1; (i << 1) <= val; i <<= 1) - ; - obd->u.cli.cl_oscc.oscc_grow_count = i; - - return count; -} - -/** - * Read OSC creator's osc_creator::oscc_max_grow_count - * - * \param page buffer to hold the returning string - * \param start - * \param off - * \param count - * \param eof - * proc read function parameters, please refer to kernel - * code fs/proc/generic.c proc_file_read() - * \param data obd device - * - * \retval number of characters printed. - */ -static int osc_rd_max_create_count(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, "%d\n", - obd->u.cli.cl_oscc.oscc_max_grow_count); -} - -/** - * Set OSC creator's osc_creator::oscc_max_grow_count - * - * \param file proc file - * \param buffer buffer containing the value - * \param count buffer size - * \param data obd device - * - * \retval \a count - */ -static int osc_wr_max_create_count(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = data; - int val, rc; - - if (obd == NULL) - return 0; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val < 0) - return -ERANGE; - if (val > OST_MAX_PRECREATE) - return -ERANGE; - - if (obd->u.cli.cl_oscc.oscc_grow_count > val) - obd->u.cli.cl_oscc.oscc_grow_count = val; - - obd->u.cli.cl_oscc.oscc_max_grow_count = val; - - return count; -} - -static int osc_rd_prealloc_next_id(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, LPU64"\n", - obd->u.cli.cl_oscc.oscc_next_id); -} - -static int osc_rd_prealloc_last_id(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, LPU64"\n", - obd->u.cli.cl_oscc.oscc_last_id); -} - static int osc_rd_checksum(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -664,11 +521,6 @@ static struct lprocfs_vars lprocfs_osc_obd_vars[] = { { "cur_lost_grant_bytes", osc_rd_cur_lost_grant_bytes, 0, 0}, { "grant_shrink_interval", osc_rd_grant_shrink_interval, osc_wr_grant_shrink_interval, 0 }, - { "create_count", osc_rd_create_count, osc_wr_create_count, 0 }, - { "max_create_count", osc_rd_max_create_count, - osc_wr_max_create_count, 0}, - { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 }, - { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 }, { "checksums", osc_rd_checksum, osc_wr_checksum, 0 }, { "checksum_type", osc_rd_checksum_type, osc_wd_checksum_type, 0 }, { "resend_count", osc_rd_resend_count, osc_wr_resend_count, 0}, diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c deleted file mode 100644 index f62b112..0000000 --- a/lustre/osc/osc_create.c +++ /dev/null @@ -1,749 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, Whamcloud, Inc. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/osc/osc_create.c - * For testing and management it is treated as an obd_device, - * although * it does not export a full OBD method table (the - * requests are coming * in over the wire, so object target modules - * do not have a full * method table.) - * - * Author: Peter Braam - */ - -#define DEBUG_SUBSYSTEM S_OSC - -#ifdef __KERNEL__ -# include -#else /* __KERNEL__ */ -# include -#endif - -#ifdef __CYGWIN__ -# include -#endif - -#include -#include -#include "osc_internal.h" - -/* XXX need AT adjust ? */ -#define osc_create_timeout (obd_timeout / 2) - -struct osc_create_async_args { - struct osc_creator *rq_oscc; - struct lov_stripe_md *rq_lsm; - struct obd_info *rq_oinfo; - int rq_grow_count; -}; - -static int oscc_internal_create(struct osc_creator *oscc); -static int handle_async_create(struct ptlrpc_request *req, int rc); - -static int osc_interpret_create(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) -{ - struct osc_create_async_args *args = ptlrpc_req_async_args(req); - struct osc_creator *oscc = args->rq_oscc; - struct ost_body *body = NULL; - struct ptlrpc_request *fake_req, *pos; - ENTRY; - - if (req->rq_repmsg) { - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL && rc == 0) - rc = -EPROTO; - } - - LASSERT(oscc && (oscc->oscc_obd != LP_POISON)); - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_CREATING; - switch (rc) { - case 0: { - if (body) { - int diff =ostid_id(&body->oa.o_oi)- oscc->oscc_last_id; - - /* oscc_internal_create() stores the original value of - * grow_count in osc_create_async_args::rq_grow_count. - * We can't compare against oscc_grow_count directly, - * because it may have been increased while the RPC - * is in flight, so we would always find ourselves - * having created fewer objects and decreasing the - * precreate request size. b=18577 */ - if (diff < args->rq_grow_count) { - /* the OST has not managed to create all the - * objects we asked for */ - oscc->oscc_grow_count = max(diff, - OST_MIN_PRECREATE); - /* don't bump grow_count next time */ - oscc->oscc_flags |= OSCC_FLAG_LOW; - } else { - /* the OST is able to keep up with the work, - * we could consider increasing grow_count - * next time if needed */ - oscc->oscc_flags &= ~OSCC_FLAG_LOW; - } - oscc->oscc_last_id = ostid_id(&body->oa.o_oi); - } - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - case -EROFS: - oscc->oscc_flags |= OSCC_FLAG_RDONLY; - case -ENOSPC: - case -EFBIG: - if (rc != -EROFS) { - oscc->oscc_flags |= OSCC_FLAG_NOSPC; - if (body && rc == -ENOSPC) { - oscc->oscc_last_id = body->oa.o_id; - oscc->oscc_grow_count = OST_MIN_PRECREATE; - - if ((body->oa.o_valid & OBD_MD_FLFLAGS) && - (body->oa.o_flags & OBD_FL_NOSPC_BLK)) - oscc->oscc_flags |= OSCC_FLAG_NOSPC_BLK; - else - rc = 0; - } - } - cfs_spin_unlock(&oscc->oscc_lock); - DEBUG_REQ(D_INODE, req, "OST out of space, flagging"); - break; - case -EIO: { - /* filter always set body->oa.o_id as the last_id - * of filter (see filter_handle_precreate for detail)*/ - if (body && body->oa.o_id > oscc->oscc_last_id) - oscc->oscc_last_id = body->oa.o_id; - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - case -EINTR: - case -EWOULDBLOCK: { - /* aka EAGAIN we should not delay create if import failed - - * this avoid client stick in create and avoid race with - * delorphan */ - /* EINTR say - old create request is killed due mds<>ost - * eviction - OSCC_FLAG_RECOVERING can already set due - * IMP_DISCONN event */ - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */ - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - default: { - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - oscc->oscc_grow_count = OST_MIN_PRECREATE; - cfs_spin_unlock(&oscc->oscc_lock); - DEBUG_REQ(D_ERROR, req, - "Unknown rc %d from async create: failing oscc", rc); - ptlrpc_fail_import(req->rq_import, - lustre_msg_get_conn_cnt(req->rq_reqmsg)); - } - } - - CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n", - oscc->oscc_last_id, oscc->oscc_next_id); - - cfs_spin_lock(&oscc->oscc_lock); - cfs_list_for_each_entry_safe(fake_req, pos, - &oscc->oscc_wait_create_list, rq_list) { - if (handle_async_create(fake_req, rc) == -EAGAIN) { - oscc_internal_create(oscc); - /* sending request should be never fail because - * osc use preallocated requests pool */ - GOTO(exit_wakeup, rc); - } - } - cfs_spin_unlock(&oscc->oscc_lock); - -exit_wakeup: - cfs_waitq_signal(&oscc->oscc_waitq); - RETURN(rc); -} - -static int oscc_internal_create(struct osc_creator *oscc) -{ - struct osc_create_async_args *args; - struct ptlrpc_request *request; - struct ost_body *body; - ENTRY; - - LASSERT_SPIN_LOCKED(&oscc->oscc_lock); - - /* Do not check for a degraded OST here - bug21563/bug18539 */ - if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) { - cfs_spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - - /* we need check it before OSCC_FLAG_CREATING - because need - * see lower number of precreate objects */ - if (oscc->oscc_grow_count < oscc->oscc_max_grow_count && - ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) && - (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <= - (oscc->oscc_grow_count / 4 + 1)) { - oscc->oscc_flags |= OSCC_FLAG_LOW; - oscc->oscc_grow_count *= 2; - } - - if (oscc->oscc_flags & OSCC_FLAG_CREATING) { - cfs_spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - - if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2) - oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2; - - oscc->oscc_flags |= OSCC_FLAG_CREATING; - cfs_spin_unlock(&oscc->oscc_lock); - - request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import, - &RQF_OST_CREATE, - LUSTRE_OST_VERSION, OST_CREATE); - if (request == NULL) { - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_CREATING; - cfs_spin_unlock(&oscc->oscc_lock); - RETURN(-ENOMEM); - } - - request->rq_request_portal = OST_CREATE_PORTAL; - ptlrpc_at_set_req_timeout(request); - body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY); - args = ptlrpc_req_async_args(request); - args->rq_oscc = oscc; - - cfs_spin_lock(&oscc->oscc_lock); - args->rq_grow_count = oscc->oscc_grow_count; - - if (likely(fid_seq_is_mdt(oscc->oscc_oa.o_seq))) { - body->oa.o_oi.oi_seq = oscc->oscc_oa.o_seq; - body->oa.o_oi.oi_id = oscc->oscc_last_id + - oscc->oscc_grow_count; - } else { - /*Just warning here currently, since not sure how fid-on-ost - *will be implemented here */ - CWARN("o_seq: "LPU64" is not indicate any MDTs.\n", - oscc->oscc_oa.o_seq); - } - cfs_spin_unlock(&oscc->oscc_lock); - - body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; - CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n", - body->oa.o_id, oscc->oscc_last_id); - - /* we should not resend create request - anyway we will have delorphan - * and kill these objects */ - request->rq_no_delay = request->rq_no_resend = 1; - ptlrpc_request_set_replen(request); - - request->rq_interpret_reply = osc_interpret_create; - ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1); - - RETURN(0); -} - -static int oscc_has_objects_nolock(struct osc_creator *oscc, int count) -{ - return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count); -} - -static int oscc_has_objects(struct osc_creator *oscc, int count) -{ - int have_objs; - - cfs_spin_lock(&oscc->oscc_lock); - have_objs = oscc_has_objects_nolock(oscc, count); - cfs_spin_unlock(&oscc->oscc_lock); - - return have_objs; -} - -static int oscc_wait_for_objects(struct osc_creator *oscc, int count) -{ - int have_objs; - int ost_unusable; - - ost_unusable = oscc->oscc_obd->u.cli.cl_import->imp_invalid; - - cfs_spin_lock(&oscc->oscc_lock); - ost_unusable |= (OSCC_FLAG_NOSPC | OSCC_FLAG_RDONLY | - OSCC_FLAG_EXITING) & oscc->oscc_flags; - have_objs = oscc_has_objects_nolock(oscc, count); - - if (!ost_unusable && !have_objs) - /* they release lock himself */ - have_objs = oscc_internal_create(oscc); - else - cfs_spin_unlock(&oscc->oscc_lock); - - return have_objs || ost_unusable; -} - -static int oscc_precreate(struct osc_creator *oscc) -{ - struct l_wait_info lwi; - int rc = 0; - ENTRY; - - if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2)) - RETURN(0); - - /* we should be not block forever - because client's create rpc can - * stick in mds for long time and forbid client reconnect */ - lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(osc_create_timeout)), - NULL, NULL); - - rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi); - RETURN(rc); -} - -static int oscc_in_sync(struct osc_creator *oscc) -{ - int sync; - - cfs_spin_lock(&oscc->oscc_lock); - sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS; - cfs_spin_unlock(&oscc->oscc_lock); - - return sync; -} - -/* decide if the OST has remaining object, return value : - 0 : the OST has remaining objects, may or may not send precreation RPC. - 1 : the OST has no remaining object, and the sent precreation RPC - has not been completed yet. - 2 : the OST has no remaining object, and will not get any for - a potentially very long time - 1000 : unusable - */ -int osc_precreate(struct obd_export *exp) -{ - struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - struct obd_import *imp = exp->exp_imp_reverse; - int rc; - ENTRY; - - LASSERT(oscc != NULL); - if (imp != NULL && imp->imp_deactive) - GOTO(out_nolock, rc = 1000); - - /* Handle critical states first */ - cfs_spin_lock(&oscc->oscc_lock); - if (oscc->oscc_flags & OSCC_FLAG_NOSPC_BLK || - oscc->oscc_flags & OSCC_FLAG_RDONLY || - oscc->oscc_flags & OSCC_FLAG_EXITING) - GOTO(out, rc = 1000); - - if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) || - (oscc->oscc_flags & OSCC_FLAG_DEGRADED)) - GOTO(out, rc = 2); - - if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2)) - GOTO(out, rc = 0); - - /* Return 0, if we have at least one object - bug 22884 */ - rc = oscc_has_objects_nolock(oscc, 1) ? 0 : 1; - - if (oscc->oscc_flags & OSCC_FLAG_NOSPC) - GOTO(out, (rc == 0) ? 0 : 1000); - - /* Do not check for OSCC_FLAG_CREATING flag here, let - * osc_precreate() call oscc_internal_create() and - * adjust oscc_grow_count bug21563 */ - if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) - GOTO(out, rc); - - if (oscc_internal_create(oscc)) - GOTO(out_nolock, rc = 1000); - - RETURN(rc); -out: - cfs_spin_unlock(&oscc->oscc_lock); -out_nolock: - return rc; -} - -static int handle_async_create(struct ptlrpc_request *req, int rc) -{ - struct osc_create_async_args *args = ptlrpc_req_async_args(req); - struct osc_creator *oscc = args->rq_oscc; - struct lov_stripe_md *lsm = args->rq_lsm; - struct obd_info *oinfo = args->rq_oinfo; - struct obdo *oa = oinfo->oi_oa; - - LASSERT_SPIN_LOCKED(&oscc->oscc_lock); - - if(rc) - GOTO(out_wake, rc); - - /* Handle the critical type errors first. - * Should we also test cl_import state as well ? */ - if (oscc->oscc_flags & OSCC_FLAG_EXITING) - GOTO(out_wake, rc = -EIO); - - if (oscc->oscc_flags & OSCC_FLAG_NOSPC_BLK) - GOTO(out_wake, rc = -ENOSPC); - - if (oscc->oscc_flags & OSCC_FLAG_RDONLY) - GOTO(out_wake, rc = -EROFS); - - /* should be try wait until recovery finished */ - if((oscc->oscc_flags & OSCC_FLAG_RECOVERING) || - (oscc->oscc_flags & OSCC_FLAG_DEGRADED)) - RETURN(-EAGAIN); - - if (oscc_has_objects_nolock(oscc, 1)) { - memcpy(oa, &oscc->oscc_oa, sizeof(*oa)); - oa->o_id = oscc->oscc_next_id; - lsm->lsm_object_id = oscc->oscc_next_id; - oscc->oscc_next_id++; - - CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n", - oscc->oscc_next_id); - GOTO(out_wake, rc = 0); - } - - /* we don't have objects now - continue wait */ - RETURN(-EAGAIN); - -out_wake: - - rc = oinfo->oi_cb_up(oinfo, rc); - ptlrpc_fakereq_finished(req); - - RETURN(rc); -} - -static int async_create_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, - int rc) -{ - struct osc_create_async_args *args = ptlrpc_req_async_args(req); - struct osc_creator *oscc = args->rq_oscc; - int ret; - - cfs_spin_lock(&oscc->oscc_lock); - ret = handle_async_create(req, rc); - cfs_spin_unlock(&oscc->oscc_lock); - - return ret; -} - -int osc_create_async(struct obd_export *exp, struct obd_info *oinfo, - struct lov_stripe_md **ea, struct obd_trans_info *oti) -{ - int rc; - struct ptlrpc_request *fake_req; - struct osc_create_async_args *args; - struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - struct obdo *oa = oinfo->oi_oa; - ENTRY; - - if ((oa->o_valid & OBD_MD_FLGROUP) && !fid_seq_is_mdt(oa->o_seq)) { - rc = osc_real_create(exp, oinfo->oi_oa, ea, oti); - rc = oinfo->oi_cb_up(oinfo, rc); - RETURN(rc); - } - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - rc = osc_real_create(exp, oinfo->oi_oa, ea, oti); - rc = oinfo->oi_cb_up(oinfo, rc); - RETURN(rc); - } - - LASSERT((*ea) != NULL); - - fake_req = ptlrpc_prep_fakereq(oscc->oscc_obd->u.cli.cl_import, - osc_create_timeout, - async_create_interpret); - if (fake_req == NULL) { - rc = oinfo->oi_cb_up(oinfo, -ENOMEM); - RETURN(-ENOMEM); - } - - args = ptlrpc_req_async_args(fake_req); - CLASSERT(sizeof(*args) <= sizeof(fake_req->rq_async_args)); - - args->rq_oscc = oscc; - args->rq_lsm = *ea; - args->rq_oinfo = oinfo; - - cfs_spin_lock(&oscc->oscc_lock); - /* try fast path */ - rc = handle_async_create(fake_req, 0); - if (rc == -EAGAIN) { - /* We don't have any objects, wait until we get a reply. */ - ptlrpcd_add_req(fake_req, PDL_POLICY_ROUND, -1); - cfs_list_add(&fake_req->rq_list, - &oscc->oscc_wait_create_list); - cfs_spin_unlock(&oscc->oscc_lock); - /* EAGAIN mean - request is delayed */ - rc = 0; - } else { - cfs_spin_unlock(&oscc->oscc_lock); - /* need free request if was error hit or - * objects already allocated */ - ptlrpc_req_finished(fake_req); - } - - RETURN(rc); -} - -int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - struct obd_import *imp = exp->exp_obd->u.cli.cl_import; - struct lov_stripe_md *lsm; - int del_orphan = 0, rc = 0; - ENTRY; - - LASSERT(oa); - LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - RETURN(osc_real_create(exp, oa, ea, oti)); - } - - if (!fid_seq_is_mdt(oa->o_seq)) - RETURN(osc_real_create(exp, oa, ea, oti)); - - /* this is the special case where create removes orphans */ - if (oa->o_valid & OBD_MD_FLFLAGS && - oa->o_flags == OBD_FL_DELORPHAN) { - cfs_spin_lock(&oscc->oscc_lock); - if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) { - cfs_spin_unlock(&oscc->oscc_lock); - RETURN(-EBUSY); - } - if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) { - cfs_spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - - oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS; - /* seting flag LOW we prevent extra grow precreate size - * and enforce use last assigned size */ - oscc->oscc_flags |= OSCC_FLAG_LOW; - cfs_spin_unlock(&oscc->oscc_lock); - CDEBUG(D_HA, "%s: oscc recovery started - delete to "LPU64"\n", - oscc->oscc_obd->obd_name, oscc->oscc_next_id - 1); - - del_orphan = 1; - - /* delete from next_id on up */ - oa->o_valid |= OBD_MD_FLID; - oa->o_id = oscc->oscc_next_id - 1; - - rc = osc_real_create(exp, oa, ea, NULL); - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS; - if (rc == 0 || rc == -ENOSPC) { - struct obd_connect_data *ocd; - - if (rc == -ENOSPC) { - oscc->oscc_flags |= OSCC_FLAG_NOSPC; - if ((oa->o_valid & OBD_MD_FLFLAGS) && - (oa->o_flags & OBD_FL_NOSPC_BLK)) - oscc->oscc_flags |= OSCC_FLAG_NOSPC_BLK; - } - oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; - - oscc->oscc_last_id = oa->o_id; - ocd = &imp->imp_connect_data; - if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) { - /* - * The OST reports back in oa->o_id from where - * we should restart in order to skip orphan - * objects - */ - CDEBUG(D_HA, "%s: Skip orphan set, reset last " - "objid\n", oscc->oscc_obd->obd_name); - oscc->oscc_next_id = oa->o_id + 1; - } - - /* sanity check for next objid. see bug 17025 */ - LASSERT(oscc->oscc_next_id == oa->o_id + 1); - - CDEBUG(D_HA, "%s: oscc recovery finished, last_id: " - LPU64", rc: %d\n", oscc->oscc_obd->obd_name, - oscc->oscc_last_id, rc); - } else { - CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n", - oscc->oscc_obd->obd_name, rc); - } - - cfs_waitq_signal(&oscc->oscc_waitq); - cfs_spin_unlock(&oscc->oscc_lock); - - if (rc < 0) - RETURN(rc); - } - - lsm = *ea; - if (lsm == NULL) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } - - while (1) { - if (oscc_in_sync(oscc)) - CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n", - oscc->oscc_obd->obd_name); - - rc = oscc_precreate(oscc); - if (rc) - CDEBUG(D_HA,"%s: error create %d\n", - oscc->oscc_obd->obd_name, rc); - - cfs_spin_lock(&oscc->oscc_lock); - - /* wakeup but recovery did not finished */ - if ((oscc->oscc_obd->u.cli.cl_import->imp_invalid) || - (oscc->oscc_flags & OSCC_FLAG_RECOVERING)) { - rc = -EIO; - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - if (oscc->oscc_flags & OSCC_FLAG_NOSPC_BLK) { - rc = -ENOSPC; - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - if (oscc->oscc_flags & OSCC_FLAG_RDONLY) { - rc = -EROFS; - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - // Should we report -EIO error ? - if (oscc->oscc_flags & OSCC_FLAG_EXITING) { - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - /** - * If this is DELORPHAN process, no need create object here, - * otherwise this will create a gap of object id, and MDS - * might create some orphan log (mds_lov_update_objids), then - * remove objects wrongly on OST. Bug 21379. - */ - if (oa->o_valid & OBD_MD_FLFLAGS && - oa->o_flags == OBD_FL_DELORPHAN) { - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - if (oscc_has_objects_nolock(oscc, 1)) { - memcpy(oa, &oscc->oscc_oa, sizeof(*oa)); - oa->o_id = oscc->oscc_next_id; - lsm->lsm_object_id = oscc->oscc_next_id; - *ea = lsm; - oscc->oscc_next_id++; - cfs_spin_unlock(&oscc->oscc_lock); - - CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n", - exp->exp_obd->obd_name, oscc->oscc_next_id); - break; - } - - if (oscc->oscc_flags & OSCC_FLAG_NOSPC) { - rc = -ENOSPC; - cfs_spin_unlock(&oscc->oscc_lock); - break; - } - - cfs_spin_unlock(&oscc->oscc_lock); - } - - if (rc == 0) { - CDEBUG(D_INFO, "%s: returning objid "LPU64"\n", - obd2cli_tgt(oscc->oscc_obd), lsm->lsm_object_id); - } else { - if (*ea == NULL) - obd_free_memmd(exp, &lsm); - if (del_orphan != 0 && rc != -EIO) - /* Ignore non-IO precreate error for clear orphan */ - rc = 0; - } - RETURN(rc); -} - -void oscc_init(struct obd_device *obd) -{ - struct osc_creator *oscc; - - if (obd == NULL) - return; - - oscc = &obd->u.cli.cl_oscc; - - memset(oscc, 0, sizeof(*oscc)); - - cfs_waitq_init(&oscc->oscc_waitq); - cfs_spin_lock_init(&oscc->oscc_lock); - oscc->oscc_obd = obd; - oscc->oscc_grow_count = OST_MIN_PRECREATE; - oscc->oscc_max_grow_count = OST_MAX_PRECREATE; - - oscc->oscc_next_id = 2; - oscc->oscc_last_id = 1; - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - - CFS_INIT_LIST_HEAD(&oscc->oscc_wait_create_list); - - /* XXX the export handle should give the oscc the last object */ - /* oed->oed_oscc.oscc_last_id = exph->....; */ -} - -void oscc_fini(struct obd_device *obd) -{ - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - ENTRY; - - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; - oscc->oscc_flags |= OSCC_FLAG_EXITING; - cfs_spin_unlock(&oscc->oscc_lock); -} diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index f9bcae1..d3cfc8d 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -85,25 +85,11 @@ struct osc_cache_waiter { int ocw_rc; }; -#define OSCC_FLAG_RECOVERING 0x01 -#define OSCC_FLAG_CREATING 0x02 -#define OSCC_FLAG_NOSPC 0x04 /* can't create more objects on OST */ -#define OSCC_FLAG_SYNC_IN_PROGRESS 0x08 /* only allow one thread to sync */ -#define OSCC_FLAG_LOW 0x10 -#define OSCC_FLAG_EXITING 0x20 -#define OSCC_FLAG_DEGRADED 0x40 -#define OSCC_FLAG_RDONLY 0x80 -#define OSCC_FLAG_NOSPC_BLK 0x100 /* no more block space on OST */ - -int osc_precreate(struct obd_export *exp); int osc_create(const struct lu_env *env, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti); -int osc_create_async(struct obd_export *exp, struct obd_info *oinfo, - struct lov_stripe_md **ea, struct obd_trans_info *oti); int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti); -void oscc_init(struct obd_device *obd); void osc_wake_cache_waiters(struct client_obd *cli); int osc_shrink_grant_to_target(struct client_obd *cli, long target); void osc_update_next_shrink(struct client_obd *cli); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index d037f66..9dfd08f 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -704,6 +704,31 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } +int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) +{ + int rc = 0; + ENTRY; + + LASSERT(oa); + LASSERT(ea); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_RECREATE_OBJS) { + RETURN(osc_real_create(exp, oa, ea, oti)); + } + + if (!fid_seq_is_mdt(oa->o_seq)) + RETURN(osc_real_create(exp, oa, ea, oti)); + + /* we should not get here anymore */ + LBUG(); + + RETURN(rc); +} + /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -2678,9 +2703,7 @@ static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; struct obd_statfs *msfs; - __u64 used; ENTRY; if (rc == -EBADR) @@ -2703,51 +2726,6 @@ static int osc_statfs_interpret(const struct lu_env *env, GOTO(out, rc = -EPROTO); } - /* Reinitialize the RDONLY and DEGRADED flags at the client - * on each statfs, so they don't stay set permanently. */ - cfs_spin_lock(&cli->cl_oscc.oscc_lock); - - if (unlikely(msfs->os_state & OS_STATE_DEGRADED)) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED; - else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED)) - cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED; - - if (unlikely(msfs->os_state & OS_STATE_READONLY)) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY; - else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY)) - cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY; - - /* Add a bit of hysteresis so this flag isn't continually flapping, - * and ensure that new files don't get extremely fragmented due to - * only a small amount of available space in the filesystem. - * We want to set the NOSPC flag when there is less than ~0.1% free - * and clear it when there is at least ~0.2% free space, so: - * avail < ~0.1% max max = avail + used - * 1025 * avail < avail + used used = blocks - free - * 1024 * avail < used - * 1024 * avail < blocks - free - * avail < ((blocks - free) >> 10) - * - * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to - * lose that amount of space so in those cases we report no space left - * if their is less than 1 GB left. */ - used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30); - if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) && - ((msfs->os_ffree < 32) || (msfs->os_bavail < used)))) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC; - else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && - (msfs->os_ffree > 64) && - (msfs->os_bavail > (used << 1)))) { - cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC | - OSCC_FLAG_NOSPC_BLK); - } - - if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && - (msfs->os_bavail < used))) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK; - - cfs_spin_unlock(&cli->cl_oscc.oscc_lock); - *aa->aa_oi->oi_osfs = *msfs; out: rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); @@ -3109,40 +3087,6 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } -static int osc_setinfo_mds_connect_import(struct obd_import *imp) -{ - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; - - ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt) { - rc = llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); - } else { - /* XXX return an error? skip setting below flags? */ - } - - cfs_spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - imp->imp_pingable = 1; - cfs_spin_unlock(&imp->imp_lock); - CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd)); - - RETURN(rc); -} - -static int osc_setinfo_mds_conn_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *aa, int rc) -{ - ENTRY; - if (rc != 0) - RETURN(rc); - - RETURN(osc_setinfo_mds_connect_import(req->rq_import)); -} - static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, obd_count keylen, void *key, obd_count vallen, void *val, struct ptlrpc_request_set *set) @@ -3156,32 +3100,6 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); - if (KEY_IS(KEY_NEXT_ID)) { - obd_id new_val; - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - if (vallen != sizeof(obd_id)) - RETURN(-ERANGE); - if (val == NULL) - RETURN(-EINVAL); - - if (vallen != sizeof(obd_id)) - RETURN(-EINVAL); - - /* avoid race between allocate new object and set next id - * from ll_sync thread */ - cfs_spin_lock(&oscc->oscc_lock); - new_val = *((obd_id*)val) + 1; - if (new_val > oscc->oscc_next_id) - oscc->oscc_next_id = new_val; - cfs_spin_unlock(&oscc->oscc_lock); - CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", - exp->exp_obd->obd_name, - obd->u.cli.cl_oscc.oscc_next_id); - - RETURN(0); - } - if (KEY_IS(KEY_CHECKSUM)) { if (vallen != sizeof(int)) RETURN(-EINVAL); @@ -3259,15 +3177,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); memcpy(tmp, val, vallen); - if (KEY_IS(KEY_MDS_CONN)) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - oscc->oscc_oa.o_seq = (*(__u32 *)val); - oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP; - LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq); - req->rq_no_delay = req->rq_no_resend = 1; - req->rq_interpret_reply = osc_setinfo_mds_conn_interpret; - } else if (KEY_IS(KEY_GRANT_SHRINK)) { + if (KEY_IS(KEY_GRANT_SHRINK)) { struct osc_grant_args *aa; struct obdo *oa; @@ -3402,14 +3312,6 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { - /* Only do this on the MDS OSC's */ - if (imp->imp_server_timeout) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - cfs_spin_unlock(&oscc->oscc_lock); - } cli = &obd->u.cli; client_obd_list_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant = 0; @@ -3441,15 +3343,6 @@ static int osc_import_event(struct obd_device *obd, break; } case IMP_EVENT_ACTIVE: { - /* Only do this on the MDS OSC's */ - if (imp->imp_server_timeout) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC | - OSCC_FLAG_NOSPC_BLK); - cfs_spin_unlock(&oscc->oscc_lock); - } rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); break; } @@ -3550,7 +3443,6 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_lprocfs_register_obd(obd); } - oscc_init(obd); /* We need to allocate a few requests more, because * brw_interpret tries to create new requests before freeing * previous ones, Ideally we want to have 2x max_rpcs_in_flight @@ -3684,9 +3576,7 @@ struct obd_ops osc_obd_ops = { .o_statfs_async = osc_statfs_async, .o_packmd = osc_packmd, .o_unpackmd = osc_unpackmd, - .o_precreate = osc_precreate, .o_create = osc_create, - .o_create_async = osc_create_async, .o_destroy = osc_destroy, .o_getattr = osc_getattr, .o_getattr_async = osc_getattr_async, diff --git a/lustre/osp/osp_precreate.c b/lustre/osp/osp_precreate.c index 5df08e9..4adf222 100644 --- a/lustre/osp/osp_precreate.c +++ b/lustre/osp/osp_precreate.c @@ -469,6 +469,20 @@ void osp_pre_update_status(struct osp_device *d, int rc) if (rc) goto out; + /* Add a bit of hysteresis so this flag isn't continually flapping, + * and ensure that new files don't get extremely fragmented due to + * only a small amount of available space in the filesystem. + * We want to set the NOSPC flag when there is less than ~0.1% free + * and clear it when there is at least ~0.2% free space, so: + * avail < ~0.1% max max = avail + used + * 1025 * avail < avail + used used = blocks - free + * 1024 * avail < used + * 1024 * avail < blocks - free + * avail < ((blocks - free) >> 10) + * + * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to + * lose that amount of space so in those cases we report no space left + * if their is less than 1 GB left. */ if (likely(msfs->os_type)) { used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index f8561b9..4150a78 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -810,87 +810,6 @@ ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, EXPORT_SYMBOL(ptlrpc_prep_req); /** - * Allocate "fake" request that would not be sent anywhere in the end. - * Only used as a hack because we have no other way of performing - * async actions in lustre between layers. - * Used on MDS to request object preallocations from more than one OST at a - * time. - */ -struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp, - unsigned int timeout, - ptlrpc_interpterer_t interpreter) -{ - struct ptlrpc_request *request = NULL; - ENTRY; - - OBD_ALLOC(request, sizeof(*request)); - if (!request) { - CERROR("request allocation out of memory\n"); - RETURN(NULL); - } - - request->rq_send_state = LUSTRE_IMP_FULL; - request->rq_type = PTL_RPC_MSG_REQUEST; - request->rq_import = class_import_get(imp); - request->rq_export = NULL; - request->rq_import_generation = imp->imp_generation; - - request->rq_timeout = timeout; - request->rq_sent = cfs_time_current_sec(); - request->rq_deadline = request->rq_sent + timeout; - request->rq_reply_deadline = request->rq_deadline; - request->rq_interpret_reply = interpreter; - request->rq_phase = RQ_PHASE_RPC; - request->rq_next_phase = RQ_PHASE_INTERPRET; - /* don't want reply */ - request->rq_receiving_reply = 0; - request->rq_must_unlink = 0; - request->rq_no_delay = request->rq_no_resend = 1; - request->rq_fake = 1; - - cfs_spin_lock_init(&request->rq_lock); - CFS_INIT_LIST_HEAD(&request->rq_list); - CFS_INIT_LIST_HEAD(&request->rq_replay_list); - CFS_INIT_LIST_HEAD(&request->rq_set_chain); - CFS_INIT_LIST_HEAD(&request->rq_history_list); - CFS_INIT_LIST_HEAD(&request->rq_exp_list); - cfs_waitq_init(&request->rq_reply_waitq); - cfs_waitq_init(&request->rq_set_waitq); - - request->rq_xid = ptlrpc_next_xid(); - cfs_atomic_set(&request->rq_refcount, 1); - - RETURN(request); -} -EXPORT_SYMBOL(ptlrpc_prep_fakereq); - -/** - * Indicate that processing of "fake" request is finished. - */ -void ptlrpc_fakereq_finished(struct ptlrpc_request *req) -{ - struct ptlrpc_request_set *set = req->rq_set; - int wakeup = 0; - - /* hold ref on the request to prevent others (ptlrpcd) to free it */ - ptlrpc_request_addref(req); - cfs_list_del_init(&req->rq_list); - - /* if we kill request before timeout - need adjust counter */ - if (req->rq_phase == RQ_PHASE_RPC && set != NULL && - cfs_atomic_dec_and_test(&set->set_remaining)) - wakeup = 1; - - ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); - - /* Only need to call wakeup once when to be empty. */ - if (wakeup) - cfs_waitq_signal(&set->set_waitq); - ptlrpc_req_finished(req); -} -EXPORT_SYMBOL(ptlrpc_fakereq_finished); - -/** * Allocate and initialize new request set structure. * Returns a pointer to the newly allocated set structure or NULL on error. */ @@ -1160,10 +1079,6 @@ static int ptlrpc_console_allow(struct ptlrpc_request *req) __u32 opc; int err; - /* Fake requests include no rq_reqmsg */ - if (req->rq_fake) - return 0; - LASSERT(req->rq_reqmsg != NULL); opc = lustre_msg_get_opc(req->rq_reqmsg); @@ -1901,9 +1816,8 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) req->rq_timedout = 1; cfs_spin_unlock(&req->rq_lock); - DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, "Request " - " sent has %s: [sent "CFS_DURATION_T"/" - "real "CFS_DURATION_T"]", + DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent "CFS_DURATION_T + "/real "CFS_DURATION_T"]", req->rq_net_err ? "failed due to network error" : ((req->rq_real_sent == 0 || cfs_time_before(req->rq_real_sent, req->rq_sent) || @@ -1925,9 +1839,6 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) RETURN(1); } - if (req->rq_fake) - RETURN(1); - cfs_atomic_inc(&imp->imp_timeouts); /* The DLM server doesn't want recovery run on its imports. */ -- 1.8.3.1