4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/fid/fid_request.c
38 * Lustre Sequence Manager
40 * Author: Yury Umanets <umka@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_FID
45 #include <linux/module.h>
46 #include <libcfs/libcfs.h>
48 #include <obd_class.h>
49 #include <obd_support.h>
50 #include <lustre_fid.h>
52 #include <lustre_mdc.h>
53 #include "fid_internal.h"
55 static int seq_client_rpc(struct lu_client_seq *seq,
56 struct lu_seq_range *output, __u32 opc,
59 struct obd_export *exp = seq->lcs_exp;
60 struct ptlrpc_request *req;
61 struct lu_seq_range *out, *in;
63 unsigned int debug_mask;
67 LASSERT(exp != NULL && !IS_ERR(exp));
68 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
69 LUSTRE_MDS_VERSION, SEQ_QUERY);
73 /* Init operation code */
74 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
77 /* Zero out input range, this is not recovery yet. */
78 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
79 lu_seq_range_init(in);
81 ptlrpc_request_set_replen(req);
83 in->lsr_index = seq->lcs_space.lsr_index;
84 if (seq->lcs_type == LUSTRE_SEQ_METADATA)
85 fld_range_set_mdt(in);
87 fld_range_set_ost(in);
89 if (opc == SEQ_ALLOC_SUPER) {
90 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
91 req->rq_reply_portal = MDC_REPLY_PORTAL;
92 /* During allocating super sequence for data object,
93 * the current thread might hold the export of MDT0(MDT0
94 * precreating objects on this OST), and it will send the
95 * request to MDT0 here, so we can not keep resending the
96 * request here, otherwise if MDT0 is failed(umounted),
97 * it can not release the export of MDT0 */
98 if (seq->lcs_type == LUSTRE_SEQ_DATA)
99 req->rq_no_delay = req->rq_no_resend = 1;
100 debug_mask = D_CONSOLE;
102 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
103 req->rq_reply_portal = MDC_REPLY_PORTAL;
104 req->rq_request_portal = SEQ_METADATA_PORTAL;
106 req->rq_reply_portal = OSC_REPLY_PORTAL;
107 req->rq_request_portal = SEQ_DATA_PORTAL;
113 /* Allow seq client RPC during recovery time. */
114 req->rq_allow_replay = 1;
116 ptlrpc_at_set_req_timeout(req);
118 rc = ptlrpc_queue_wait(req);
123 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
126 if (!lu_seq_range_is_sane(output)) {
127 CERROR("%s: Invalid range received from server: "
128 DRANGE"\n", seq->lcs_name, PRANGE(output));
129 GOTO(out_req, rc = -EINVAL);
132 if (lu_seq_range_is_exhausted(output)) {
133 CERROR("%s: Range received from server is exhausted: "
134 DRANGE"]\n", seq->lcs_name, PRANGE(output));
135 GOTO(out_req, rc = -EINVAL);
138 CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n",
139 seq->lcs_name, opcname, PRANGE(output));
143 ptlrpc_req_finished(req);
147 /* Request sequence-controller node to allocate new super-sequence. */
148 int seq_client_alloc_super(struct lu_client_seq *seq,
149 const struct lu_env *env)
154 mutex_lock(&seq->lcs_mutex);
157 #ifdef HAVE_SEQ_SERVER
158 LASSERT(env != NULL);
159 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
165 /* Check whether the connection to seq controller has been
166 * setup (lcs_exp != NULL) */
167 if (seq->lcs_exp == NULL) {
168 mutex_unlock(&seq->lcs_mutex);
169 RETURN(-EINPROGRESS);
172 rc = seq_client_rpc(seq, &seq->lcs_space,
173 SEQ_ALLOC_SUPER, "super");
175 mutex_unlock(&seq->lcs_mutex);
179 /* Request sequence-controller node to allocate new meta-sequence. */
180 static int seq_client_alloc_meta(const struct lu_env *env,
181 struct lu_client_seq *seq)
187 #ifdef HAVE_SEQ_SERVER
188 LASSERT(env != NULL);
189 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
195 /* If meta server return -EINPROGRESS or EAGAIN,
196 * it means meta server might not be ready to
197 * allocate super sequence from sequence controller
199 rc = seq_client_rpc(seq, &seq->lcs_space,
200 SEQ_ALLOC_META, "meta");
201 } while (rc == -EINPROGRESS || rc == -EAGAIN);
207 /* Allocate new sequence for client. */
208 static int seq_client_alloc_seq(const struct lu_env *env,
209 struct lu_client_seq *seq, u64 *seqnr)
214 LASSERT(lu_seq_range_is_sane(&seq->lcs_space));
216 if (lu_seq_range_is_exhausted(&seq->lcs_space)) {
217 rc = seq_client_alloc_meta(env, seq);
219 CERROR("%s: Can't allocate new meta-sequence,"
220 "rc %d\n", seq->lcs_name, rc);
223 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
224 seq->lcs_name, PRANGE(&seq->lcs_space));
230 LASSERT(!lu_seq_range_is_exhausted(&seq->lcs_space));
231 *seqnr = seq->lcs_space.lsr_start;
232 seq->lcs_space.lsr_start += 1;
234 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
240 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
243 if (seq->lcs_update) {
244 add_wait_queue(&seq->lcs_waitq, link);
245 set_current_state(TASK_UNINTERRUPTIBLE);
246 mutex_unlock(&seq->lcs_mutex);
250 mutex_lock(&seq->lcs_mutex);
251 remove_wait_queue(&seq->lcs_waitq, link);
252 set_current_state(TASK_RUNNING);
257 mutex_unlock(&seq->lcs_mutex);
262 static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
265 LASSERT(seq->lcs_update == 1);
267 mutex_lock(&seq->lcs_mutex);
269 CDEBUG(D_INFO, "%s: New sequence [0x%16.16"LPF64"x]\n",
270 seq->lcs_name, seqnr);
272 seq->lcs_fid.f_seq = seqnr;
274 /* Since the caller require the whole seq,
275 * so marked this seq to be used */
276 if (seq->lcs_type == LUSTRE_SEQ_METADATA)
278 LUSTRE_METADATA_SEQ_MAX_WIDTH;
280 seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
282 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
284 seq->lcs_fid.f_ver = 0;
288 wake_up_all(&seq->lcs_waitq);
292 * Allocate the whole non-used seq to the caller.
294 * \param[in] env pointer to the thread context
295 * \param[in,out] seq pointer to the client sequence manager
296 * \param[out] seqnr to hold the new allocated sequence
298 * \retval 0 for new sequence allocated.
299 * \retval Negative error number on failure.
301 int seq_client_get_seq(const struct lu_env *env,
302 struct lu_client_seq *seq, u64 *seqnr)
307 LASSERT(seqnr != NULL);
309 mutex_lock(&seq->lcs_mutex);
310 init_waitqueue_entry(&link, current);
312 /* To guarantee that we can get a whole non-used sequence. */
313 while (seq_fid_alloc_prep(seq, &link) != 0);
315 rc = seq_client_alloc_seq(env, seq, seqnr);
316 seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true);
318 CERROR("%s: Can't allocate new sequence: rc = %d\n",
320 mutex_unlock(&seq->lcs_mutex);
324 EXPORT_SYMBOL(seq_client_get_seq);
327 * Allocate new fid on passed client @seq and save it to @fid.
329 * \param[in] env pointer to the thread context
330 * \param[in,out] seq pointer to the client sequence manager
331 * \param[out] fid to hold the new allocated fid
333 * \retval 1 for notify the caller that sequence switch
334 * is performed to allow it to setup FLD for it.
335 * \retval 0 for new FID allocated in current sequence.
336 * \retval Negative error number on failure.
338 int seq_client_alloc_fid(const struct lu_env *env,
339 struct lu_client_seq *seq, struct lu_fid *fid)
345 LASSERT(seq != NULL);
346 LASSERT(fid != NULL);
348 init_waitqueue_entry(&link, current);
349 mutex_lock(&seq->lcs_mutex);
351 if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
352 seq->lcs_fid.f_oid = seq->lcs_width;
357 if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
358 fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
359 /* Just bump last allocated fid and return to caller. */
360 seq->lcs_fid.f_oid++;
365 /* Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid
366 * deadlock during seq_client_alloc_seq(). */
367 rc = seq_fid_alloc_prep(seq, &link);
371 rc = seq_client_alloc_seq(env, seq, &seqnr);
372 /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */
373 seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false);
375 CERROR("%s: Can't allocate new sequence: rc = %d\n",
377 mutex_unlock(&seq->lcs_mutex);
387 mutex_unlock(&seq->lcs_mutex);
389 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
393 EXPORT_SYMBOL(seq_client_alloc_fid);
396 * Finish the current sequence due to disconnect.
397 * See mdc_import_event()
399 void seq_client_flush(struct lu_client_seq *seq)
403 LASSERT(seq != NULL);
404 init_waitqueue_entry(&link, current);
405 mutex_lock(&seq->lcs_mutex);
407 while (seq->lcs_update) {
408 add_wait_queue(&seq->lcs_waitq, &link);
409 set_current_state(TASK_UNINTERRUPTIBLE);
410 mutex_unlock(&seq->lcs_mutex);
414 mutex_lock(&seq->lcs_mutex);
415 remove_wait_queue(&seq->lcs_waitq, &link);
416 set_current_state(TASK_RUNNING);
419 fid_zero(&seq->lcs_fid);
421 * this id shld not be used for seq range allocation.
422 * set to -1 for dgb check.
425 seq->lcs_space.lsr_index = -1;
427 lu_seq_range_init(&seq->lcs_space);
428 mutex_unlock(&seq->lcs_mutex);
430 EXPORT_SYMBOL(seq_client_flush);
432 static void seq_client_proc_fini(struct lu_client_seq *seq)
434 #ifdef CONFIG_PROC_FS
436 if (seq->lcs_proc_dir) {
437 if (!IS_ERR(seq->lcs_proc_dir))
438 lprocfs_remove(&seq->lcs_proc_dir);
439 seq->lcs_proc_dir = NULL;
442 #endif /* CONFIG_PROC_FS */
445 static int seq_client_proc_init(struct lu_client_seq *seq)
447 #ifdef CONFIG_PROC_FS
451 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, seq_type_proc_dir,
453 if (IS_ERR(seq->lcs_proc_dir)) {
454 CERROR("%s: LProcFS failed in seq-init\n",
456 rc = PTR_ERR(seq->lcs_proc_dir);
460 rc = lprocfs_add_vars(seq->lcs_proc_dir, seq_client_proc_list, seq);
462 CERROR("%s: Can't init sequence manager "
463 "proc, rc %d\n", seq->lcs_name, rc);
464 GOTO(out_cleanup, rc);
470 seq_client_proc_fini(seq);
473 #else /* !CONFIG_PROC_FS */
475 #endif /* CONFIG_PROC_FS */
478 int seq_client_init(struct lu_client_seq *seq,
479 struct obd_export *exp,
480 enum lu_cli_type type,
482 struct lu_server_seq *srv)
487 LASSERT(seq != NULL);
488 LASSERT(prefix != NULL);
491 seq->lcs_type = type;
493 mutex_init(&seq->lcs_mutex);
494 if (type == LUSTRE_SEQ_METADATA)
495 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
497 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
499 init_waitqueue_head(&seq->lcs_waitq);
500 /* Make sure that things are clear before work is started. */
501 seq_client_flush(seq);
504 seq->lcs_exp = class_export_get(exp);
506 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
509 rc = seq_client_proc_init(seq);
511 seq_client_fini(seq);
514 EXPORT_SYMBOL(seq_client_init);
516 void seq_client_fini(struct lu_client_seq *seq)
520 seq_client_proc_fini(seq);
522 if (seq->lcs_exp != NULL) {
523 class_export_put(seq->lcs_exp);
530 EXPORT_SYMBOL(seq_client_fini);
532 int client_fid_init(struct obd_device *obd,
533 struct obd_export *exp, enum lu_cli_type type)
535 struct client_obd *cli = &obd->u.cli;
540 OBD_ALLOC_PTR(cli->cl_seq);
541 if (cli->cl_seq == NULL)
544 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
546 GOTO(out_free_seq, rc = -ENOMEM);
548 snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name);
550 /* Init client side sequence-manager */
551 rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
552 OBD_FREE(prefix, MAX_OBD_NAME + 5);
554 GOTO(out_free_seq, rc);
558 OBD_FREE_PTR(cli->cl_seq);
562 EXPORT_SYMBOL(client_fid_init);
564 int client_fid_fini(struct obd_device *obd)
566 struct client_obd *cli = &obd->u.cli;
569 if (cli->cl_seq != NULL) {
570 seq_client_fini(cli->cl_seq);
571 OBD_FREE_PTR(cli->cl_seq);
577 EXPORT_SYMBOL(client_fid_fini);
579 struct proc_dir_entry *seq_type_proc_dir;
581 static int __init fid_init(void)
583 seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
586 if (IS_ERR(seq_type_proc_dir))
587 return PTR_ERR(seq_type_proc_dir);
589 # ifdef HAVE_SERVER_SUPPORT
590 fid_server_mod_init();
596 static void __exit fid_exit(void)
598 # ifdef HAVE_SERVER_SUPPORT
599 fid_server_mod_exit();
602 if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
603 lprocfs_remove(&seq_type_proc_dir);
604 seq_type_proc_dir = NULL;
608 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
609 MODULE_DESCRIPTION("Lustre File IDentifier");
610 MODULE_VERSION(LUSTRE_VERSION_STRING);
611 MODULE_LICENSE("GPL");
613 module_init(fid_init);
614 module_exit(fid_exit);