1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52 * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
66 /* assigns client to sequence controller node */
67 int seq_server_set_cli(struct lu_server_seq *seq,
68 struct lu_client_seq *cli,
69 const struct lu_context *ctx)
75 CDEBUG(D_INFO|D_WARNING, "%s: detached "
76 "sequence mgr client %s\n", seq->seq_name,
77 cli->seq_exp->exp_client_uuid.uuid);
83 CERROR("%s: sequence-controller is already "
84 "assigned\n", seq->seq_name);
88 CDEBUG(D_INFO|D_WARNING, "%s: attached "
89 "sequence client %s\n", seq->seq_name,
90 cli->seq_exp->exp_client_uuid.uuid);
92 /* asking client for new range, assign that range to ->seq_super and
93 * write seq state to backing store should be atomic. */
96 /* assign controller */
99 /* get new range from controller only if super-sequence is not yet
100 * initialized from backing store or something else. */
101 if (range_is_zero(&seq->seq_super)) {
102 rc = seq_client_alloc_super(cli);
104 CERROR("can't allocate super-sequence, "
109 /* take super-seq from client seq mgr */
110 LASSERT(range_is_sane(&cli->seq_range));
112 seq->seq_super = cli->seq_range;
114 /* save init seq to backing store. */
115 rc = seq_store_write(seq, ctx);
117 CERROR("can't write sequence state, "
127 EXPORT_SYMBOL(seq_server_set_cli);
129 /* on controller node, allocate new super sequence for regular sequence
131 static int __seq_server_alloc_super(struct lu_server_seq *seq,
132 struct lu_range *range,
133 const struct lu_context *ctx)
135 struct lu_range *space = &seq->seq_space;
139 LASSERT(range_is_sane(space));
141 if (range_space(space) < seq->seq_super_width) {
142 CWARN("sequences space is going to exhaust soon. "
143 "Only can allocate "LPU64" sequences\n",
144 space->lr_end - space->lr_start);
146 space->lr_start = space->lr_end;
148 } else if (range_is_exhausted(space)) {
149 CERROR("sequences space is exhausted\n");
152 range_alloc(range, space, seq->seq_super_width);
157 rc = seq_store_write(seq, ctx);
159 CERROR("can't save state, rc = %d\n",
165 CDEBUG(D_INFO, "%s: allocated super-sequence "
166 "["LPX64"-"LPX64"]\n", seq->seq_name,
167 range->lr_start, range->lr_end);
173 static int seq_server_alloc_super(struct lu_server_seq *seq,
174 struct lu_range *range,
175 const struct lu_context *ctx)
181 rc = __seq_server_alloc_super(seq, range, ctx);
187 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
188 struct lu_range *range,
189 const struct lu_context *ctx)
191 struct lu_range *super = &seq->seq_super;
195 LASSERT(range_is_sane(super));
197 /* XXX: here we should avoid cascading RPCs using kind of async
198 * preallocation when meta-sequence is close to exhausting. */
199 if (range_is_exhausted(super)) {
201 CERROR("no seq-controller client is setup\n");
205 rc = seq_client_alloc_super(seq->seq_cli);
207 CERROR("can't allocate new super-sequence, "
212 /* saving new range into allocation space. */
213 *super = seq->seq_cli->seq_range;
214 LASSERT(range_is_sane(super));
216 range_alloc(range, super, seq->seq_meta_width);
218 rc = seq_store_write(seq, ctx);
220 CERROR("can't save state, rc = %d\n",
225 CDEBUG(D_INFO, "%s: allocated meta-sequence "
226 "["LPX64"-"LPX64"]\n", seq->seq_name,
227 range->lr_start, range->lr_end);
233 static int seq_server_alloc_meta(struct lu_server_seq *seq,
234 struct lu_range *range,
235 const struct lu_context *ctx)
241 rc = __seq_server_alloc_meta(seq, range, ctx);
247 static int seq_req_handle0(const struct lu_context *ctx,
248 struct ptlrpc_request *req)
250 int rep_buf_size[2] = { 0, };
251 struct req_capsule pill;
252 struct lu_site *site;
253 struct lu_range *out;
258 site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
259 LASSERT(site != NULL);
261 req_capsule_init(&pill, req, RCL_SERVER,
264 req_capsule_set(&pill, &RQF_SEQ_QUERY);
265 req_capsule_pack(&pill);
267 opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
269 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
271 CERROR("can't get range buffer\n");
272 GOTO(out_pill, rc= -EPROTO);
277 if (!site->ls_server_seq) {
278 CERROR("sequence-server is not initialized\n");
279 GOTO(out_pill, rc == -EINVAL);
281 rc = seq_server_alloc_meta(site->ls_server_seq, out, ctx);
283 case SEQ_ALLOC_SUPER:
284 if (!site->ls_control_seq) {
285 CERROR("sequence-controller is not initialized\n");
286 GOTO(out_pill, rc == -EINVAL);
288 rc = seq_server_alloc_super(site->ls_control_seq, out, ctx);
291 CERROR("wrong opc 0x%x\n", *opc);
295 CERROR("cannot unpack client request\n");
300 req_capsule_fini(&pill);
304 static int seq_req_handle(struct ptlrpc_request *req)
306 int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
307 const struct lu_context *ctx;
311 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
313 ctx = req->rq_svc_thread->t_ctx;
314 LASSERT(ctx != NULL);
315 LASSERT(ctx->lc_thread == req->rq_svc_thread);
316 if (req->rq_reqmsg->opc == SEQ_QUERY) {
317 if (req->rq_export != NULL) {
318 rc = seq_req_handle0(ctx, req);
320 CERROR("Unconnected request\n");
321 req->rq_status = -ENOTCONN;
324 CERROR("Wrong opcode: %d\n",
325 req->rq_reqmsg->opc);
326 req->rq_status = -ENOTSUPP;
327 rc = ptlrpc_error(req);
331 target_send_reply(req, rc, fail);
336 static int seq_server_proc_init(struct lu_server_seq *seq)
341 seq->seq_proc_dir = lprocfs_register(seq->seq_name,
344 if (IS_ERR(seq->seq_proc_dir)) {
345 CERROR("LProcFS failed in seq-init\n");
346 rc = PTR_ERR(seq->seq_proc_dir);
350 seq->seq_proc_entry = lprocfs_register("services",
353 if (IS_ERR(seq->seq_proc_entry)) {
354 CERROR("LProcFS failed in seq-init\n");
355 rc = PTR_ERR(seq->seq_proc_entry);
359 rc = lprocfs_add_vars(seq->seq_proc_dir,
360 seq_server_proc_list, seq);
362 CERROR("can't init sequence manager "
363 "proc, rc %d\n", rc);
370 lprocfs_remove(seq->seq_proc_entry);
372 lprocfs_remove(seq->seq_proc_dir);
374 seq->seq_proc_dir = NULL;
375 seq->seq_proc_entry = NULL;
379 static void seq_server_proc_fini(struct lu_server_seq *seq)
382 if (seq->seq_proc_entry) {
383 lprocfs_remove(seq->seq_proc_entry);
384 seq->seq_proc_entry = NULL;
387 if (seq->seq_proc_dir) {
388 lprocfs_remove(seq->seq_proc_dir);
389 seq->seq_proc_dir = NULL;
395 int seq_server_init(struct lu_server_seq *seq,
396 struct dt_device *dev,
398 lu_server_type_t type,
399 const struct lu_context *ctx)
401 int rc, portal = (type == LUSTRE_SEQ_SRV) ?
402 SEQ_SRV_PORTAL : SEQ_CTLR_PORTAL;
404 struct ptlrpc_service_conf seq_conf = {
405 .psc_nbufs = MDS_NBUFS,
406 .psc_bufsize = MDS_BUFSIZE,
407 .psc_max_req_size = MDS_MAXREQSIZE,
408 .psc_max_reply_size = MDS_MAXREPSIZE,
409 .psc_req_portal = portal,
410 .psc_rep_portal = MDC_REPLY_PORTAL,
411 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
412 .psc_num_threads = SEQ_NUM_THREADS,
413 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
417 LASSERT(dev != NULL);
418 LASSERT(uuid != NULL);
422 seq->seq_type = type;
423 sema_init(&seq->seq_sem, 1);
425 seq->seq_super_width = LUSTRE_SEQ_SUPER_WIDTH;
426 seq->seq_meta_width = LUSTRE_SEQ_META_WIDTH;
428 snprintf(seq->seq_name, sizeof(seq->seq_name), "%s-%s-%s",
429 LUSTRE_SEQ_NAME, (type == LUSTRE_SEQ_SRV ? "srv" : "ctl"),
432 seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
433 seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
435 lu_device_get(&seq->seq_dev->dd_lu_dev);
437 rc = seq_store_init(seq, ctx);
441 /* request backing store for saved sequence info */
442 rc = seq_store_read(seq, ctx);
443 if (rc == -ENODATA) {
444 if (type == LUSTRE_SEQ_SRV) {
445 CDEBUG(D_INFO|D_WARNING, "%s: no data on "
446 "disk found, wait for controller "
447 "attach\n", seq->seq_name);
449 CDEBUG(D_INFO|D_WARNING, "%s: no data on "
450 "disk found, this is first controller "
451 "run\n", seq->seq_name);
454 CERROR("can't read sequence state, rc = %d\n",
460 rc = seq_server_proc_init(seq);
465 seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
470 if (seq->seq_service != NULL)
471 rc = ptlrpc_start_threads(NULL, seq->seq_service,
480 seq_server_fini(seq, ctx);
482 CDEBUG(D_INFO|D_WARNING, "%s Sequence Manager\n",
483 (type == LUSTRE_SEQ_SRV ? "Server" : "Controller"));
487 EXPORT_SYMBOL(seq_server_init);
489 void seq_server_fini(struct lu_server_seq *seq,
490 const struct lu_context *ctx)
494 if (seq->seq_service != NULL) {
495 ptlrpc_unregister_service(seq->seq_service);
496 seq->seq_service = NULL;
500 seq_server_proc_fini(seq);
503 seq_store_fini(seq, ctx);
505 if (seq->seq_dev != NULL) {
506 lu_device_put(&seq->seq_dev->dd_lu_dev);
512 EXPORT_SYMBOL(seq_server_fini);
514 static int fid_init(void)
520 static int fid_fini(void)
526 static int __init fid_mod_init(void)
528 /* init caches if any */
533 static void __exit fid_mod_exit(void)
535 /* free caches if any */
540 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
541 MODULE_DESCRIPTION("Lustre FID Module");
542 MODULE_LICENSE("GPL");
544 cfs_module(fid, "0.1.0", fid_mod_init, fid_mod_exit);