1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* Assigns client to sequence controller node. */
52 int seq_server_set_cli(struct lu_server_seq *seq,
53 struct lu_client_seq *cli,
54 const struct lu_env *env)
60 * Ask client for new range, assign that range to ->seq_space and write
61 * seq state to backing store should be atomic.
66 CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
67 seq->lss_name, cli->lcs_name);
72 if (seq->lss_cli != NULL) {
73 CERROR("%s: Sequence controller is already "
74 "assigned\n", seq->lss_name);
75 GOTO(out_up, rc = -EINVAL);
78 CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
79 seq->lss_name, cli->lcs_name);
87 EXPORT_SYMBOL(seq_server_set_cli);
90 * On controller node, allocate new super sequence for regular sequence server.
92 static int __seq_server_alloc_super(struct lu_server_seq *seq,
95 const struct lu_env *env)
97 struct lu_range *space = &seq->lss_space;
101 LASSERT(range_is_sane(space));
104 CDEBUG(D_INFO, "%s: Input seq range: "
105 DRANGE"\n", seq->lss_name, PRANGE(in));
107 if (in->lr_end > space->lr_start)
108 space->lr_start = in->lr_end;
111 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
112 seq->lss_name, PRANGE(space));
114 if (range_space(space) < seq->lss_width) {
115 CWARN("%s: Sequences space to be exhausted soon. "
116 "Only "LPU64" sequences left\n", seq->lss_name,
119 space->lr_start = space->lr_end;
120 } else if (range_is_exhausted(space)) {
121 CERROR("%s: Sequences space is exhausted\n",
125 range_alloc(out, space, seq->lss_width);
129 rc = seq_store_write(seq, env);
131 CERROR("%s: Can't write space data, rc %d\n",
136 CDEBUG(D_INFO, "%s: Allocated super-sequence "
137 DRANGE"\n", seq->lss_name, PRANGE(out));
142 int seq_server_alloc_super(struct lu_server_seq *seq,
144 struct lu_range *out,
145 const struct lu_env *env)
151 rc = __seq_server_alloc_super(seq, in, out, env);
157 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
159 struct lu_range *out,
160 const struct lu_env *env)
162 struct lu_range *space = &seq->lss_space;
166 LASSERT(range_is_sane(space));
169 * This is recovery case. Adjust super range if input range looks like
170 * it is allocated from new super.
173 CDEBUG(D_INFO, "%s: Input seq range: "
174 DRANGE"\n", seq->lss_name, PRANGE(in));
176 if (range_is_exhausted(space)) {
178 * Server cannot send empty range to client, this is why
179 * we check here that range from client is "newer" than
182 LASSERT(in->lr_end > space->lr_start);
185 * Start is set to end of last allocated, because it
186 * *is* already allocated so we take that into account
187 * and do not use for other allocations.
189 space->lr_start = in->lr_end;
192 * End is set to in->lr_start + super sequence
193 * allocation unit. That is because in->lr_start is
194 * first seq in new allocated range from controller
197 space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
200 CERROR("%s: No sequence controller "
201 "is attached.\n", seq->lss_name);
206 * Let controller know that this is recovery and last
207 * obtained range from it was @space.
209 rc = seq_client_replay_super(seq->lss_cli, space, env);
211 CERROR("%s: Can't replay super-sequence, "
212 "rc %d\n", seq->lss_name, rc);
217 * Update super start by end from client's range. Super
218 * end should not be changed if range was not exhausted.
220 if (in->lr_end > space->lr_start)
221 space->lr_start = in->lr_end;
226 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
227 seq->lss_name, PRANGE(space));
230 * XXX: Avoid cascading RPCs using kind of async preallocation
231 * when meta-sequence is close to exhausting.
233 if (range_is_exhausted(space)) {
235 CERROR("%s: No sequence controller "
236 "is attached.\n", seq->lss_name);
240 rc = seq_client_alloc_super(seq->lss_cli, env);
242 CERROR("%s: Can't allocate super-sequence, "
243 "rc %d\n", seq->lss_name, rc);
247 /* Saving new range to allocation space. */
248 *space = seq->lss_cli->lcs_space;
249 LASSERT(range_is_sane(space));
252 range_alloc(out, space, seq->lss_width);
255 rc = seq_store_write(seq, env);
257 CERROR("%s: Can't write space data, rc %d\n",
262 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
263 DRANGE"\n", seq->lss_name, PRANGE(out));
269 int seq_server_alloc_meta(struct lu_server_seq *seq,
271 struct lu_range *out,
272 const struct lu_env *env)
278 rc = __seq_server_alloc_meta(seq, in, out, env);
283 EXPORT_SYMBOL(seq_server_alloc_meta);
285 static int seq_server_handle(struct lu_site *site,
286 const struct lu_env *env,
287 __u32 opc, struct lu_range *in,
288 struct lu_range *out)
295 if (!site->ls_server_seq) {
296 CERROR("Sequence server is not "
300 rc = seq_server_alloc_meta(site->ls_server_seq,
303 case SEQ_ALLOC_SUPER:
304 if (!site->ls_control_seq) {
305 CERROR("Sequence controller is not "
309 rc = seq_server_alloc_super(site->ls_control_seq,
320 static int seq_req_handle(struct ptlrpc_request *req,
321 const struct lu_env *env,
322 struct seq_thread_info *info)
324 struct lu_range *out, *in = NULL;
325 struct lu_site *site;
330 site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
331 LASSERT(site != NULL);
333 rc = req_capsule_pack(&info->sti_pill);
335 RETURN(err_serious(rc));
337 opc = req_capsule_client_get(&info->sti_pill,
340 out = req_capsule_server_get(&info->sti_pill,
343 RETURN(err_serious(-EPROTO));
345 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
346 in = req_capsule_client_get(&info->sti_pill,
349 LASSERT(!range_is_zero(in) && range_is_sane(in));
352 rc = seq_server_handle(site, env, *opc, in, out);
354 rc = err_serious(-EPROTO);
359 static void *seq_key_init(const struct lu_context *ctx,
360 struct lu_context_key *key)
362 struct seq_thread_info *info;
365 * check that no high order allocations are incurred.
367 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
370 info = ERR_PTR(-ENOMEM);
374 static void seq_key_fini(const struct lu_context *ctx,
375 struct lu_context_key *key, void *data)
377 struct seq_thread_info *info = data;
381 struct lu_context_key seq_thread_key = {
382 .lct_tags = LCT_MD_THREAD,
383 .lct_init = seq_key_init,
384 .lct_fini = seq_key_fini
387 static void seq_thread_info_init(struct ptlrpc_request *req,
388 struct seq_thread_info *info)
392 /* Mark rep buffer as req-layout stuff expects */
393 for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
394 info->sti_rep_buf_size[i] = -1;
396 /* Init request capsule */
397 req_capsule_init(&info->sti_pill, req, RCL_SERVER,
398 info->sti_rep_buf_size);
400 req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
403 static void seq_thread_info_fini(struct seq_thread_info *info)
405 req_capsule_fini(&info->sti_pill);
408 static int seq_handle(struct ptlrpc_request *req)
410 const struct lu_env *env;
411 struct seq_thread_info *info;
414 env = req->rq_svc_thread->t_env;
415 LASSERT(env != NULL);
417 info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
418 LASSERT(info != NULL);
420 seq_thread_info_init(req, info);
421 rc = seq_req_handle(req, env, info);
422 seq_thread_info_fini(info);
428 * Entry point for handling FLD RPCs called from MDT.
430 int seq_query(struct com_thread_info *info)
432 return seq_handle(info->cti_pill.rc_req);
434 EXPORT_SYMBOL(seq_query);
436 static void seq_server_proc_fini(struct lu_server_seq *seq);
439 static int seq_server_proc_init(struct lu_server_seq *seq)
444 seq->lss_proc_dir = lprocfs_register(seq->lss_name,
447 if (IS_ERR(seq->lss_proc_dir)) {
448 rc = PTR_ERR(seq->lss_proc_dir);
452 rc = lprocfs_add_vars(seq->lss_proc_dir,
453 seq_server_proc_list, seq);
455 CERROR("%s: Can't init sequence manager "
456 "proc, rc %d\n", seq->lss_name, rc);
457 GOTO(out_cleanup, rc);
463 seq_server_proc_fini(seq);
467 static void seq_server_proc_fini(struct lu_server_seq *seq)
470 if (seq->lss_proc_dir != NULL) {
471 if (!IS_ERR(seq->lss_proc_dir))
472 lprocfs_remove(&seq->lss_proc_dir);
473 seq->lss_proc_dir = NULL;
478 static int seq_server_proc_init(struct lu_server_seq *seq)
483 static void seq_server_proc_fini(struct lu_server_seq *seq)
489 int seq_server_init(struct lu_server_seq *seq,
490 struct dt_device *dev,
492 enum lu_mgr_type type,
493 const struct lu_env *env)
495 int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
498 LASSERT(dev != NULL);
499 LASSERT(prefix != NULL);
502 seq->lss_type = type;
503 range_zero(&seq->lss_space);
504 sema_init(&seq->lss_sem, 1);
506 seq->lss_width = is_srv ?
507 LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
509 snprintf(seq->lss_name, sizeof(seq->lss_name),
510 "%s-%s", (is_srv ? "srv" : "ctl"), prefix);
512 rc = seq_store_init(seq, env, dev);
516 /* Request backing store for saved sequence info. */
517 rc = seq_store_read(seq, env);
518 if (rc == -ENODATA) {
520 /* Nothing is read, init by default value. */
521 seq->lss_space = is_srv ?
522 LUSTRE_SEQ_ZERO_RANGE:
523 LUSTRE_SEQ_SPACE_RANGE;
525 CDEBUG(D_INFO, "%s: No data found "
526 "on store. Initialize space\n",
529 /* Save default controller value to store. */
530 rc = seq_store_write(seq, env);
532 CERROR("%s: Can't write space data, "
533 "rc %d\n", seq->lss_name, rc);
536 CERROR("%s: Can't read space data, rc %d\n",
542 LASSERT(range_is_sane(&seq->lss_space));
544 LASSERT(!range_is_zero(&seq->lss_space) &&
545 range_is_sane(&seq->lss_space));
548 rc = seq_server_proc_init(seq);
555 seq_server_fini(seq, env);
558 EXPORT_SYMBOL(seq_server_init);
560 void seq_server_fini(struct lu_server_seq *seq,
561 const struct lu_env *env)
565 seq_server_proc_fini(seq);
566 seq_store_fini(seq, env);
570 EXPORT_SYMBOL(seq_server_fini);
572 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
574 static int __init fid_mod_init(void)
576 seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
579 if (IS_ERR(seq_type_proc_dir))
580 return PTR_ERR(seq_type_proc_dir);
582 LU_CONTEXT_KEY_INIT(&seq_thread_key);
583 lu_context_key_register(&seq_thread_key);
587 static void __exit fid_mod_exit(void)
589 lu_context_key_degister(&seq_thread_key);
590 if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
591 lprocfs_remove(&seq_type_proc_dir);
592 seq_type_proc_dir = NULL;
596 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
597 MODULE_DESCRIPTION("Lustre FID Module");
598 MODULE_LICENSE("GPL");
600 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);