X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ffid%2Ffid_request.c;h=6facae1f111a73a85b7be62bda4c8cf4c8242dcb;hb=6e6357dbf9a14aaea459f460dbe4f93e52c814d4;hp=26692dc2e8d13ad3d154a5df75846c1c5bd272d9;hpb=d016a086b68ca4af2dcbfefc3917b9291efd8d62;p=fs%2Flustre-release.git diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index 26692dc..6facae1 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -29,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,9 +40,6 @@ * Author: Yury Umanets */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_FID #ifdef __KERNEL__ @@ -69,79 +64,87 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *output, __u32 opc, const char *opcname) { - struct obd_export *exp = seq->lcs_exp; - struct ptlrpc_request *req; - struct lu_seq_range *out, *in; - __u32 *op; - int rc; - ENTRY; - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, - LUSTRE_MDS_VERSION, SEQ_QUERY); - if (req == NULL) - RETURN(-ENOMEM); - - /* Init operation code */ - op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); - *op = opc; - - /* Zero out input range, this is not recovery yet. */ - in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); - range_init(in); - - ptlrpc_request_set_replen(req); - - if (seq->lcs_type == LUSTRE_SEQ_METADATA) { - req->rq_request_portal = SEQ_METADATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_MDT; - } else { - LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA, - "unknown lcs_type %u\n", seq->lcs_type); - req->rq_request_portal = SEQ_DATA_PORTAL; - in->lsr_flags = LU_SEQ_RANGE_OST; - } - - if (opc == SEQ_ALLOC_SUPER) { - /* Update index field of *in, it is required for - * FLD update on super sequence allocator node. */ - in->lsr_index = seq->lcs_space.lsr_index; - req->rq_request_portal = SEQ_CONTROLLER_PORTAL; - } else { - LASSERTF(opc == SEQ_ALLOC_META, - "unknown opcode %u\n, opc", opc); - } - - ptlrpc_at_set_req_timeout(req); - - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - - if (rc) - GOTO(out_req, rc); - - out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); - *output = *out; - - if (!range_is_sane(output)) { - CERROR("%s: Invalid range received from server: " - DRANGE"\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - - if (range_is_exhausted(output)) { - CERROR("%s: Range received from server is exhausted: " - DRANGE"]\n", seq->lcs_name, PRANGE(output)); - GOTO(out_req, rc = -EINVAL); - } - - CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n", - seq->lcs_name, opcname, PRANGE(output)); - - EXIT; + struct obd_export *exp = seq->lcs_exp; + struct ptlrpc_request *req; + struct lu_seq_range *out, *in; + __u32 *op; + unsigned int debug_mask; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, + LUSTRE_MDS_VERSION, SEQ_QUERY); + if (req == NULL) + RETURN(-ENOMEM); + + /* Init operation code */ + op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); + *op = opc; + + /* Zero out input range, this is not recovery yet. */ + in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); + range_init(in); + + ptlrpc_request_set_replen(req); + + in->lsr_index = seq->lcs_space.lsr_index; + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + in->lsr_flags = LU_SEQ_RANGE_MDT; + else + in->lsr_flags = LU_SEQ_RANGE_OST; + + if (opc == SEQ_ALLOC_SUPER) { + req->rq_request_portal = SEQ_CONTROLLER_PORTAL; + req->rq_reply_portal = MDC_REPLY_PORTAL; + /* During allocating super sequence for data object, + * the current thread might hold the export of MDT0(MDT0 + * precreating objects on this OST), and it will send the + * request to MDT0 here, so we can not keep resending the + * request here, otherwise if MDT0 is failed(umounted), + * it can not release the export of MDT0 */ + if (seq->lcs_type == LUSTRE_SEQ_DATA) + req->rq_no_delay = req->rq_no_resend = 1; + debug_mask = D_CONSOLE; + } else { + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + req->rq_request_portal = SEQ_METADATA_PORTAL; + else + req->rq_request_portal = SEQ_DATA_PORTAL; + debug_mask = D_INFO; + } + + ptlrpc_at_set_req_timeout(req); + + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + if (rc) + GOTO(out_req, rc); + + out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); + *output = *out; + + if (!range_is_sane(output)) { + CERROR("%s: Invalid range received from server: " + DRANGE"\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + if (range_is_exhausted(output)) { + CERROR("%s: Range received from server is exhausted: " + DRANGE"]\n", seq->lcs_name, PRANGE(output)); + GOTO(out_req, rc = -EINVAL); + } + + CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", + seq->lcs_name, opcname, PRANGE(output)); + + EXIT; out_req: - ptlrpc_req_finished(req); - return rc; + ptlrpc_req_finished(req); + return rc; } /* Request sequence-controller node to allocate new super-sequence. */ @@ -151,7 +154,7 @@ int seq_client_alloc_super(struct lu_client_seq *seq, int rc; ENTRY; - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); #ifdef __KERNEL__ if (seq->lcs_srv) { @@ -160,12 +163,19 @@ int seq_client_alloc_super(struct lu_client_seq *seq, env); } else { #endif - rc = seq_client_rpc(seq, &seq->lcs_space, + /* Check whether the connection to seq controller has been + * setup (lcs_exp != NULL) */ + if (seq->lcs_exp == NULL) { + mutex_unlock(&seq->lcs_mutex); + RETURN(-EINPROGRESS); + } + + rc = seq_client_rpc(seq, &seq->lcs_space, SEQ_ALLOC_SUPER, "super"); #ifdef __KERNEL__ } #endif - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -182,8 +192,14 @@ static int seq_client_alloc_meta(const struct lu_env *env, rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); } else { #endif - rc = seq_client_rpc(seq, &seq->lcs_space, - SEQ_ALLOC_META, "meta"); + do { + /* If meta server return -EINPROGRESS or EAGAIN, + * it means meta server might not be ready to + * allocate super sequence from sequence controller + * (MDT0)yet */ + rc = seq_client_rpc(seq, &seq->lcs_space, + SEQ_ALLOC_META, "meta"); + } while (rc == -EINPROGRESS || rc == -EAGAIN); #ifdef __KERNEL__ } #endif @@ -229,29 +245,31 @@ static int seq_fid_alloc_prep(struct lu_client_seq *seq, if (seq->lcs_update) { cfs_waitq_add(&seq->lcs_waitq, link); cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); cfs_waitq_wait(link, CFS_TASK_UNINT); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitq_del(&seq->lcs_waitq, link); cfs_set_current_state(CFS_TASK_RUNNING); return -EAGAIN; } ++seq->lcs_update; - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return 0; } static void seq_fid_alloc_fini(struct lu_client_seq *seq) { LASSERT(seq->lcs_update == 1); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); --seq->lcs_update; cfs_waitq_signal(&seq->lcs_waitq); } -/* Allocate the whole seq to the caller*/ +/** + * Allocate the whole seq to the caller. + **/ int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq, seqno_t *seqnr) { @@ -259,7 +277,7 @@ int seq_client_get_seq(const struct lu_env *env, int rc; LASSERT(seqnr != NULL); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitlink_init(&link); while (1) { @@ -273,25 +291,28 @@ int seq_client_get_seq(const struct lu_env *env, CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return rc; } CDEBUG(D_INFO, "%s: allocate sequence " "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); - /*Since the caller require the whole seq, - *so marked this seq to be used*/ - seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH; - seq->lcs_fid.f_seq = *seqnr; - seq->lcs_fid.f_ver = 0; + /* Since the caller require the whole seq, + * so marked this seq to be used */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; /* * Inform caller that sequence switch is performed to allow it * to setup FLD for it. */ seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); return rc; } @@ -309,7 +330,10 @@ int seq_client_alloc_fid(const struct lu_env *env, LASSERT(fid != NULL); cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); + + if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) + seq->lcs_fid.f_oid = seq->lcs_width; while (1) { seqno_t seqnr; @@ -331,7 +355,7 @@ int seq_client_alloc_fid(const struct lu_env *env, CERROR("%s: Can't allocate new sequence, " "rc %d\n", seq->lcs_name, rc); seq_fid_alloc_fini(seq); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); RETURN(rc); } @@ -353,7 +377,7 @@ int seq_client_alloc_fid(const struct lu_env *env, } *fid = seq->lcs_fid; - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); RETURN(rc); @@ -370,16 +394,16 @@ void seq_client_flush(struct lu_client_seq *seq) LASSERT(seq != NULL); cfs_waitlink_init(&link); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); while (seq->lcs_update) { cfs_waitq_add(&seq->lcs_waitq, &link); cfs_set_current_state(CFS_TASK_UNINT); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); cfs_waitq_wait(&link, CFS_TASK_UNINT); - cfs_mutex_lock(&seq->lcs_mutex); + mutex_lock(&seq->lcs_mutex); cfs_waitq_del(&seq->lcs_waitq, &link); cfs_set_current_state(CFS_TASK_RUNNING); } @@ -393,7 +417,7 @@ void seq_client_flush(struct lu_client_seq *seq) seq->lcs_space.lsr_index = -1; range_init(&seq->lcs_space); - cfs_mutex_unlock(&seq->lcs_mutex); + mutex_unlock(&seq->lcs_mutex); } EXPORT_SYMBOL(seq_client_flush); @@ -459,36 +483,37 @@ int seq_client_init(struct lu_client_seq *seq, const char *prefix, struct lu_server_seq *srv) { - int rc; - ENTRY; + int rc; + ENTRY; - LASSERT(seq != NULL); - LASSERT(prefix != NULL); + LASSERT(seq != NULL); + LASSERT(prefix != NULL); - seq->lcs_exp = exp; - seq->lcs_srv = srv; - seq->lcs_type = type; - cfs_mutex_init(&seq->lcs_mutex); - seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH; - cfs_waitq_init(&seq->lcs_waitq); + seq->lcs_srv = srv; + seq->lcs_type = type; - /* Make sure that things are clear before work is started. */ - seq_client_flush(seq); + mutex_init(&seq->lcs_mutex); + if (type == LUSTRE_SEQ_METADATA) + seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - if (exp == NULL) { - LASSERT(seq->lcs_srv != NULL); - } else { - LASSERT(seq->lcs_exp != NULL); - seq->lcs_exp = class_export_get(seq->lcs_exp); - } + cfs_waitq_init(&seq->lcs_waitq); + /* Make sure that things are clear before work is started. */ + seq_client_flush(seq); - snprintf(seq->lcs_name, sizeof(seq->lcs_name), - "cli-%s", prefix); + if (exp != NULL) + seq->lcs_exp = class_export_get(exp); + else if (type == LUSTRE_SEQ_METADATA) + LASSERT(seq->lcs_srv != NULL); - rc = seq_client_proc_init(seq); - if (rc) - seq_client_fini(seq); - RETURN(rc); + snprintf(seq->lcs_name, sizeof(seq->lcs_name), + "cli-%s", prefix); + + rc = seq_client_proc_init(seq); + if (rc) + seq_client_fini(seq); + RETURN(rc); } EXPORT_SYMBOL(seq_client_init);