1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52 * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
66 /* assigns client to sequence controller node */
67 int seq_server_set_cli(struct lu_server_seq *seq,
68 struct lu_client_seq *cli,
69 const struct lu_context *ctx)
75 CDEBUG(D_INFO|D_WARNING, "%s: detached "
76 "sequence mgr client %s\n", seq->lss_name,
77 cli->lcs_exp->exp_client_uuid.uuid);
83 CERROR("%s: sequence-controller is already "
84 "assigned\n", seq->lss_name);
88 CDEBUG(D_INFO|D_WARNING, "%s: attached "
89 "sequence client %s\n", seq->lss_name,
90 cli->lcs_exp->exp_client_uuid.uuid);
92 /* asking client for new range, assign that range to ->seq_super and
93 * write seq state to backing store should be atomic. */
96 /* assign controller */
99 /* get new range from controller only if super-sequence is not yet
100 * initialized from backing store or something else. */
101 if (range_is_zero(&seq->lss_super)) {
102 rc = seq_client_alloc_super(cli);
104 CERROR("can't allocate super-sequence, "
109 /* take super-seq from client seq mgr */
110 LASSERT(range_is_sane(&cli->lcs_range));
112 seq->lss_super = cli->lcs_range;
114 /* save init seq to backing store. */
115 rc = seq_store_write(seq, ctx);
117 CERROR("can't write sequence state, "
127 EXPORT_SYMBOL(seq_server_set_cli);
129 /* on controller node, allocate new super sequence for regular sequence
131 static int __seq_server_alloc_super(struct lu_server_seq *seq,
132 struct lu_range *range,
133 const struct lu_context *ctx)
135 struct lu_range *space = &seq->lss_space;
139 LASSERT(range_is_sane(space));
141 if (range_space(space) < seq->lss_super_width) {
142 CWARN("sequences space is going to exhaust soon. "
143 "Can allocate only "LPU64" sequences\n",
146 space->lr_start = space->lr_end;
148 } else if (range_is_exhausted(space)) {
149 CERROR("sequences space is exhausted\n");
152 range_alloc(range, space, seq->lss_super_width);
157 rc = seq_store_write(seq, ctx);
159 CERROR("can't save state, rc = %d\n",
165 CDEBUG(D_INFO, "%s: allocated super-sequence "
166 DRANGE"\n", seq->lss_name, PRANGE(range));
172 static int seq_server_alloc_super(struct lu_server_seq *seq,
173 struct lu_range *range,
174 const struct lu_context *ctx)
180 rc = __seq_server_alloc_super(seq, range, ctx);
186 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
187 struct lu_range *range,
188 const struct lu_context *ctx)
190 struct lu_range *super = &seq->lss_super;
194 LASSERT(range_is_sane(super));
196 /* XXX: here we should avoid cascading RPCs using kind of async
197 * preallocation when meta-sequence is close to exhausting. */
198 if (range_is_exhausted(super)) {
200 CERROR("no seq-controller client is setup\n");
204 rc = seq_client_alloc_super(seq->lss_cli);
206 CERROR("can't allocate new super-sequence, "
211 /* saving new range into allocation space. */
212 *super = seq->lss_cli->lcs_range;
213 LASSERT(range_is_sane(super));
215 range_alloc(range, super, seq->lss_meta_width);
217 rc = seq_store_write(seq, ctx);
219 CERROR("can't save state, rc = %d\n",
224 CDEBUG(D_INFO, "%s: allocated meta-sequence "
225 DRANGE"\n", seq->lss_name, PRANGE(range));
231 static int seq_server_alloc_meta(struct lu_server_seq *seq,
232 struct lu_range *range,
233 const struct lu_context *ctx)
239 rc = __seq_server_alloc_meta(seq, range, ctx);
245 static int seq_req_handle0(const struct lu_context *ctx,
246 struct ptlrpc_request *req,
247 struct seq_thread_info *info)
249 struct lu_site *site;
250 struct lu_range *out;
255 site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
256 LASSERT(site != NULL);
258 req_capsule_pack(&info->sti_pill);
260 opc = req_capsule_client_get(&info->sti_pill,
263 out = req_capsule_server_get(&info->sti_pill,
270 if (!site->ls_server_seq) {
271 CERROR("sequence-server is not "
275 rc = seq_server_alloc_meta(site->ls_server_seq,
278 case SEQ_ALLOC_SUPER:
279 if (!site->ls_control_seq) {
280 CERROR("sequence-controller is not "
284 rc = seq_server_alloc_super(site->ls_control_seq,
288 CERROR("wrong opc %#x\n", *opc);
296 static void *seq_thread_init(const struct lu_context *ctx,
297 struct lu_context_key *key)
299 struct seq_thread_info *info;
302 * check that no high order allocations are incurred.
304 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
307 info = ERR_PTR(-ENOMEM);
311 static void seq_thread_fini(const struct lu_context *ctx,
312 struct lu_context_key *key, void *data)
314 struct seq_thread_info *info = data;
318 struct lu_context_key seq_thread_key = {
319 .lct_tags = LCT_MD_THREAD,
320 .lct_init = seq_thread_init,
321 .lct_fini = seq_thread_fini
324 static void seq_thread_info_init(struct ptlrpc_request *req,
325 struct seq_thread_info *info)
329 /* mark rep buffer as req-layout stuff expects */
330 for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
331 info->sti_rep_buf_size[i] = -1;
333 /* init request capsule */
334 req_capsule_init(&info->sti_pill, req, RCL_SERVER,
335 info->sti_rep_buf_size);
337 req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
340 static void seq_thread_info_fini(struct seq_thread_info *info)
342 req_capsule_fini(&info->sti_pill);
345 static int seq_req_handle(struct ptlrpc_request *req)
347 const struct lu_context *ctx;
348 struct seq_thread_info *info;
352 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
354 ctx = req->rq_svc_thread->t_ctx;
355 LASSERT(ctx != NULL);
356 LASSERT(ctx->lc_thread == req->rq_svc_thread);
358 info = lu_context_key_get(ctx, &seq_thread_key);
359 LASSERT(info != NULL);
361 seq_thread_info_init(req, info);
363 if (req->rq_reqmsg->opc == SEQ_QUERY) {
364 if (req->rq_export != NULL) {
366 * no need to return error here and overwrite @rc, this
367 * function should return 0 even if seq_req_handle0()
368 * returns some error code.
370 seq_req_handle0(ctx, req, info);
372 CERROR("Unconnected request\n");
373 req->rq_status = -ENOTCONN;
376 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
377 req->rq_status = -ENOTSUPP;
378 rc = ptlrpc_error(req);
382 target_send_reply(req, rc, OBD_FAIL_SEQ_ALL_REPLY_NET);
385 seq_thread_info_fini(info);
389 static void seq_server_proc_fini(struct lu_server_seq *seq);
392 static int seq_server_proc_init(struct lu_server_seq *seq)
397 seq->lss_proc_dir = lprocfs_register(seq->lss_name,
400 if (IS_ERR(seq->lss_proc_dir)) {
401 CERROR("LProcFS failed in seq-init\n");
402 rc = PTR_ERR(seq->lss_proc_dir);
406 seq->lss_proc_entry = lprocfs_register("services",
409 if (IS_ERR(seq->lss_proc_entry)) {
410 CERROR("LProcFS failed in seq-init\n");
411 rc = PTR_ERR(seq->lss_proc_entry);
412 GOTO(out_cleanup, rc);
415 rc = lprocfs_add_vars(seq->lss_proc_dir,
416 seq_server_proc_list, seq);
418 CERROR("can't init sequence manager "
419 "proc, rc %d\n", rc);
420 GOTO(out_cleanup, rc);
426 seq_server_proc_fini(seq);
430 static void seq_server_proc_fini(struct lu_server_seq *seq)
433 if (seq->lss_proc_entry != NULL) {
434 if (!IS_ERR(seq->lss_proc_entry))
435 lprocfs_remove(seq->lss_proc_entry);
436 seq->lss_proc_entry = NULL;
439 if (seq->lss_proc_dir != NULL) {
440 if (!IS_ERR(seq->lss_proc_dir))
441 lprocfs_remove(seq->lss_proc_dir);
442 seq->lss_proc_dir = NULL;
447 static int seq_server_proc_init(struct lu_server_seq *seq)
452 static void seq_server_proc_fini(struct lu_server_seq *seq)
458 int seq_server_init(struct lu_server_seq *seq,
459 struct dt_device *dev,
461 enum lu_mgr_type type,
462 const struct lu_context *ctx)
464 int is_srv = type == LUSTRE_SEQ_SERVER;
466 int rc, portal = is_srv ?
467 SEQ_SERVER_PORTAL : SEQ_CONTROLLER_PORTAL;
469 struct ptlrpc_service_conf seq_conf = {
470 .psc_nbufs = MDS_NBUFS,
471 .psc_bufsize = MDS_BUFSIZE,
472 .psc_max_req_size = SEQ_MAXREQSIZE,
473 .psc_max_reply_size = SEQ_MAXREPSIZE,
474 .psc_req_portal = portal,
475 .psc_rep_portal = MDC_REPLY_PORTAL,
476 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
477 .psc_num_threads = SEQ_NUM_THREADS,
478 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
482 LASSERT(dev != NULL);
483 LASSERT(uuid != NULL);
487 seq->lss_type = type;
488 sema_init(&seq->lss_sem, 1);
490 seq->lss_super_width = LUSTRE_SEQ_SUPER_WIDTH;
491 seq->lss_meta_width = LUSTRE_SEQ_META_WIDTH;
493 snprintf(seq->lss_name, sizeof(seq->lss_name), "%s-%s-%s",
494 LUSTRE_SEQ_NAME, (is_srv ? "srv" : "ctl"),
497 seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
498 seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
500 lu_device_get(&seq->lss_dev->dd_lu_dev);
502 rc = seq_store_init(seq, ctx);
506 /* request backing store for saved sequence info */
507 rc = seq_store_read(seq, ctx);
508 if (rc == -ENODATA) {
509 CDEBUG(D_INFO|D_WARNING, "%s: no data on "
510 "storage was found, %s\n", seq->lss_name,
511 is_srv ? "wait for controller attach" :
512 "this is first controller run");
514 CERROR("can't read sequence state, rc = %d\n",
519 rc = seq_server_proc_init(seq);
523 seq->lss_service = ptlrpc_init_svc_conf(&seq_conf,
528 if (seq->lss_service != NULL)
529 rc = ptlrpc_start_threads(NULL, seq->lss_service,
538 seq_server_fini(seq, ctx);
540 CDEBUG(D_INFO|D_WARNING, "%s Sequence Manager\n",
541 (is_srv ? "Server" : "Controller"));
545 EXPORT_SYMBOL(seq_server_init);
547 void seq_server_fini(struct lu_server_seq *seq,
548 const struct lu_context *ctx)
552 if (seq->lss_service != NULL) {
553 ptlrpc_unregister_service(seq->lss_service);
554 seq->lss_service = NULL;
557 seq_server_proc_fini(seq);
558 seq_store_fini(seq, ctx);
560 if (seq->lss_dev != NULL) {
561 lu_device_put(&seq->lss_dev->dd_lu_dev);
567 EXPORT_SYMBOL(seq_server_fini);
569 static int fid_init(void)
575 static int fid_fini(void)
581 static int __init fid_mod_init(void)
583 /* init caches if any */
588 static void __exit fid_mod_exit(void)
590 /* free caches if any */
595 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
596 MODULE_DESCRIPTION("Lustre FID Module");
597 MODULE_LICENSE("GPL");
599 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);