1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* assigns client to sequence controller node */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53 struct lu_client_seq *cli,
54 const struct lu_env *env)
60 CDEBUG(D_INFO|D_WARNING, "%s: Detached "
61 "sequence client %s\n", seq->lss_name,
68 CERROR("%s: Sequence-controller is already "
69 "assigned\n", seq->lss_name);
73 CDEBUG(D_INFO|D_WARNING, "%s: Attached "
74 "sequence client %s\n", seq->lss_name,
77 /* asking client for new range, assign that range to ->seq_super and
78 * write seq state to backing store should be atomic. */
81 /* assign controller */
84 /* get new range from controller only if super-sequence is not yet
85 * initialized from backing store or something else. */
86 if (range_is_zero(&seq->lss_super)) {
87 rc = seq_client_alloc_super(cli, env);
90 CERROR("%s: Can't allocate super-sequence, "
91 "rc %d\n", seq->lss_name, rc);
95 /* take super-seq from client seq mgr */
96 LASSERT(range_is_sane(&cli->lcs_range));
98 seq->lss_super = cli->lcs_range;
100 /* save init seq to backing store. */
101 rc = seq_store_write(seq, env);
103 CERROR("%s: Can't write sequence state, "
104 "rc = %d\n", seq->lss_name, rc);
111 EXPORT_SYMBOL(seq_server_set_cli);
113 /* on controller node, allocate new super sequence for regular sequence
115 static int __seq_server_alloc_super(struct lu_server_seq *seq,
117 struct lu_range *out,
118 const struct lu_env *env)
120 struct lu_range *space = &seq->lss_space;
124 LASSERT(range_is_sane(space));
127 CDEBUG(D_INFO|D_WARNING, "%s: Recovery started. Use input "
128 "(last allocated) range "DRANGE"\n", seq->lss_name,
131 if (in->lr_start > space->lr_start)
132 space->lr_start = in->lr_start;
135 CDEBUG(D_INFO|D_WARNING, "%s: Recovery finished. Recovered "
136 "space: "DRANGE"\n", seq->lss_name, PRANGE(space));
138 if (range_space(space) < seq->lss_super_width) {
139 CWARN("%s: Sequences space to be exhausted soon. "
140 "Only "LPU64" sequences left\n", seq->lss_name,
143 space->lr_start = space->lr_end;
144 } else if (range_is_exhausted(space)) {
145 CERROR("%s: Sequences space is exhausted\n",
149 range_alloc(out, space, seq->lss_super_width);
153 rc = seq_store_write(seq, env);
155 CERROR("%s: Can't save state, rc %d\n", seq->lss_name, rc);
159 CDEBUG(D_INFO, "%s: Allocated super-sequence "
160 DRANGE"\n", seq->lss_name, PRANGE(out));
165 int seq_server_alloc_super(struct lu_server_seq *seq,
167 struct lu_range *out,
168 const struct lu_env *env)
174 rc = __seq_server_alloc_super(seq, in, out, env);
180 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
182 struct lu_range *out,
183 const struct lu_env *env)
185 struct lu_range *super = &seq->lss_super;
189 LASSERT(range_is_sane(super));
192 * This is recovery case. Adjust super range if input range looks like
193 * it is allocated from new super.
196 CDEBUG(D_INFO|D_WARNING, "%s: Recovery started. Use input "
197 "(last allocated) range "DRANGE"\n", seq->lss_name,
200 if (range_is_exhausted(super)) {
201 LASSERT(in->lr_start > super->lr_start);
204 * Server cannot send to client empty range, this is why
205 * we check here that range from client is "newer" than
208 super->lr_start = in->lr_start;
210 super->lr_end = super->lr_start +
211 LUSTRE_SEQ_SUPER_WIDTH;
214 * Update super start by start from client's range. End
215 * should not be changed if range was not exhausted.
217 if (in->lr_start > super->lr_start)
218 super->lr_start = in->lr_start;
223 CDEBUG(D_INFO|D_WARNING, "%s: Recovery finished. Recovered "
224 "super: "DRANGE"\n", seq->lss_name, PRANGE(super));
227 * XXX: avoid cascading RPCs using kind of async preallocation
228 * when meta-sequence is close to exhausting.
230 if (range_is_exhausted(super)) {
232 CERROR("%s: No sequence controller client "
233 "is setup\n", seq->lss_name);
237 rc = seq_client_alloc_super(seq->lss_cli, env);
239 CERROR("%s: Can't allocate super-sequence, "
240 "rc %d\n", seq->lss_name, rc);
244 /* saving new range into allocation space. */
245 *super = seq->lss_cli->lcs_range;
246 LASSERT(range_is_sane(super));
248 range_alloc(out, super, seq->lss_meta_width);
251 rc = seq_store_write(seq, env);
253 CERROR("%s: Can't save state, rc = %d\n",
258 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
259 DRANGE"\n", seq->lss_name, PRANGE(out));
265 int seq_server_alloc_meta(struct lu_server_seq *seq,
267 struct lu_range *out,
268 const struct lu_env *env)
274 rc = __seq_server_alloc_meta(seq, in, out, env);
280 static int seq_server_handle(struct lu_site *site,
281 const struct lu_env *env,
282 __u32 opc, struct lu_range *in,
283 struct lu_range *out)
290 if (!site->ls_server_seq) {
291 CERROR("Sequence server is not "
295 rc = seq_server_alloc_meta(site->ls_server_seq,
298 case SEQ_ALLOC_SUPER:
299 if (!site->ls_control_seq) {
300 CERROR("Sequence-controller is not "
304 rc = seq_server_alloc_super(site->ls_control_seq,
315 static int seq_req_handle(struct ptlrpc_request *req, const struct lu_env *env,
316 struct seq_thread_info *info)
318 struct lu_range *out, *in = NULL;
319 struct lu_site *site;
324 site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
325 LASSERT(site != NULL);
327 rc = req_capsule_pack(&info->sti_pill);
331 opc = req_capsule_client_get(&info->sti_pill,
334 out = req_capsule_server_get(&info->sti_pill,
339 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
340 in = req_capsule_client_get(&info->sti_pill,
343 LASSERT(!range_is_zero(in) && range_is_sane(in));
346 rc = seq_server_handle(site, env, *opc, in, out);
353 static void *seq_key_init(const struct lu_context *ctx,
354 struct lu_context_key *key)
356 struct seq_thread_info *info;
359 * check that no high order allocations are incurred.
361 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
364 info = ERR_PTR(-ENOMEM);
368 static void seq_key_fini(const struct lu_context *ctx,
369 struct lu_context_key *key, void *data)
371 struct seq_thread_info *info = data;
375 struct lu_context_key seq_thread_key = {
376 .lct_tags = LCT_MD_THREAD,
377 .lct_init = seq_key_init,
378 .lct_fini = seq_key_fini
381 static void seq_thread_info_init(struct ptlrpc_request *req,
382 struct seq_thread_info *info)
386 /* mark rep buffer as req-layout stuff expects */
387 for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
388 info->sti_rep_buf_size[i] = -1;
390 /* init request capsule */
391 req_capsule_init(&info->sti_pill, req, RCL_SERVER,
392 info->sti_rep_buf_size);
394 req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
397 static void seq_thread_info_fini(struct seq_thread_info *info)
399 req_capsule_fini(&info->sti_pill);
402 static int seq_handle(struct ptlrpc_request *req)
404 const struct lu_env *env;
405 struct seq_thread_info *info;
408 env = req->rq_svc_thread->t_env;
409 LASSERT(env != NULL);
411 info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
412 LASSERT(info != NULL);
414 seq_thread_info_init(req, info);
415 rc = seq_req_handle(req, env, info);
416 seq_thread_info_fini(info);
422 * Entry point for handling FLD RPCs called from MDT.
424 int seq_query(struct com_thread_info *info)
426 return seq_handle(info->cti_pill.rc_req);
428 EXPORT_SYMBOL(seq_query);
430 static void seq_server_proc_fini(struct lu_server_seq *seq);
433 static int seq_server_proc_init(struct lu_server_seq *seq)
438 seq->lss_proc_dir = lprocfs_register(seq->lss_name,
441 if (IS_ERR(seq->lss_proc_dir)) {
442 rc = PTR_ERR(seq->lss_proc_dir);
446 rc = lprocfs_add_vars(seq->lss_proc_dir,
447 seq_server_proc_list, seq);
449 CERROR("%s: Can't init sequence manager "
450 "proc, rc %d\n", seq->lss_name, rc);
451 GOTO(out_cleanup, rc);
457 seq_server_proc_fini(seq);
461 static void seq_server_proc_fini(struct lu_server_seq *seq)
464 if (seq->lss_proc_dir != NULL) {
465 if (!IS_ERR(seq->lss_proc_dir))
466 lprocfs_remove(seq->lss_proc_dir);
467 seq->lss_proc_dir = NULL;
472 static int seq_server_proc_init(struct lu_server_seq *seq)
477 static void seq_server_proc_fini(struct lu_server_seq *seq)
483 int seq_server_init(struct lu_server_seq *seq,
484 struct dt_device *dev,
486 enum lu_mgr_type type,
487 const struct lu_env *env)
489 int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
492 LASSERT(dev != NULL);
493 LASSERT(prefix != NULL);
496 seq->lss_type = type;
497 sema_init(&seq->lss_sem, 1);
499 seq->lss_super_width = LUSTRE_SEQ_SUPER_WIDTH;
500 seq->lss_meta_width = LUSTRE_SEQ_META_WIDTH;
502 snprintf(seq->lss_name, sizeof(seq->lss_name),
503 "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
505 seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
506 seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
508 rc = seq_store_init(seq, env, dev);
512 /* request backing store for saved sequence info */
513 rc = seq_store_read(seq, env);
514 if (rc == -ENODATA) {
515 CDEBUG(D_INFO|D_WARNING, "%s: No data found "
516 "on storage, %s\n", seq->lss_name,
517 is_srv ? "wait for controller attach" :
518 "this is first controller run");
520 CERROR("%s: Can't read sequence state, rc %d\n",
525 rc = seq_server_proc_init(seq);
532 seq_server_fini(seq, env);
535 EXPORT_SYMBOL(seq_server_init);
537 void seq_server_fini(struct lu_server_seq *seq,
538 const struct lu_env *env)
542 seq_server_proc_fini(seq);
543 seq_store_fini(seq, env);
547 EXPORT_SYMBOL(seq_server_fini);
549 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
551 static int __init fid_mod_init(void)
553 printk(KERN_INFO "Lustre: Sequence Manager; "
554 "info@clusterfs.com\n");
556 seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
559 if (IS_ERR(seq_type_proc_dir))
560 return PTR_ERR(seq_type_proc_dir);
562 lu_context_key_register(&seq_thread_key);
566 static void __exit fid_mod_exit(void)
568 lu_context_key_degister(&seq_thread_key);
569 if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
570 lprocfs_remove(seq_type_proc_dir);
571 seq_type_proc_dir = NULL;
575 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
576 MODULE_DESCRIPTION("Lustre FID Module");
577 MODULE_LICENSE("GPL");
579 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);