1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52 * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
67 seq_server_write_state(struct lu_server_seq *seq,
68 const struct lu_context *ctx)
73 /* XXX: here should be calling struct dt_device methods to write
74 * sequence state to backing store. */
80 seq_server_read_state(struct lu_server_seq *seq,
81 const struct lu_context *ctx)
86 /* XXX: here should be calling struct dt_device methods to read the
87 * sequence state from backing store. */
92 /* assigns client to sequence controller node */
94 seq_server_set_ctlr(struct lu_server_seq *seq,
95 struct lu_client_seq *cli,
96 const struct lu_context *ctx)
101 LASSERT(cli != NULL);
104 CERROR("SEQ-MGR(srv): sequence-controller "
105 "is already assigned\n");
109 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): assign "
110 "sequence controller client %s\n",
111 cli->seq_exp->exp_client_uuid.uuid);
115 /* assign controller */
118 /* get new range from controller only if super-sequence is not yet
119 * initialized from backing store or something else. */
120 if (range_is_zero(&seq->seq_super)) {
121 /* release sema to avoid deadlock for case we're asking our
124 rc = seq_client_alloc_super(cli);
128 CERROR("can't allocate super-sequence, "
133 /* take super-seq from client seq mgr */
134 LASSERT(range_is_sane(&cli->seq_range));
136 seq->seq_super = cli->seq_range;
138 /* save init seq to backing store. */
139 rc = seq_server_write_state(seq, ctx);
141 CERROR("can't write sequence state, "
148 EXPORT_SYMBOL(seq_server_set_ctlr);
150 /* on controller node, allocate new super sequence for regular sequnece
153 __seq_server_alloc_super(struct lu_server_seq *seq,
154 struct lu_range *range,
155 const struct lu_context *ctx)
157 struct lu_range *space = &seq->seq_space;
161 LASSERT(range_is_sane(space));
163 if (range_space(space) < seq->seq_super_width) {
164 CWARN("sequences space is going to exhaust soon. "
165 "Only can allocate "LPU64" sequences\n",
166 space->lr_end - space->lr_start);
168 space->lr_start = space->lr_end;
170 } else if (range_is_exhausted(space)) {
171 CERROR("sequences space is exhausted\n");
174 range_alloc(range, space, seq->seq_super_width);
178 rc = seq_server_write_state(seq, ctx);
180 CERROR("can't save state, rc = %d\n",
185 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated super-sequence "
186 "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
193 seq_server_alloc_super(struct lu_server_seq *seq,
194 struct lu_range *range,
195 const struct lu_context *ctx)
201 rc = __seq_server_alloc_super(seq, range, ctx);
208 __seq_server_alloc_meta(struct lu_server_seq *seq,
209 struct lu_range *range,
210 const struct lu_context *ctx)
212 struct lu_range *super = &seq->seq_super;
216 LASSERT(range_is_sane(super));
218 /* XXX: here we should avoid cascading RPCs using kind of async
219 * preallocation when meta-sequence is close to exhausting. */
220 if (range_is_exhausted(super)) {
222 CERROR("no seq-controller client is setup\n");
226 /* allocate new super-sequence. */
228 rc = seq_client_alloc_super(seq->seq_cli);
231 CERROR("can't allocate new super-sequence, "
236 if (seq->seq_cli->seq_range.lr_start > super->lr_start) {
237 /* saving new range into allocation space. */
238 *super = seq->seq_cli->seq_range;
239 LASSERT(range_is_sane(super));
241 /* XXX: race is catched, ignore what we have from
242 * controller node. The only issue is that controller
243 * node has now this super-sequence lost, what makes
244 * sequences space smaller. */
245 CWARN("SEQ-MGR(srv): race is cached, reject "
246 "allocated super-sequence\n");
250 range_alloc(range, super, seq->seq_meta_width);
252 rc = seq_server_write_state(seq, ctx);
254 CERROR("can't save state, rc = %d\n",
259 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated meta-sequence "
260 "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
267 seq_server_alloc_meta(struct lu_server_seq *seq,
268 struct lu_range *range,
269 const struct lu_context *ctx)
275 rc = __seq_server_alloc_meta(seq, range, ctx);
282 seq_server_handle(struct lu_server_seq *seq,
283 const struct lu_context *ctx,
284 struct lu_range *range,
291 case SEQ_ALLOC_SUPER:
292 rc = seq_server_alloc_super(seq, range, ctx);
295 rc = seq_server_alloc_meta(seq, range, ctx);
306 seq_req_handle0(const struct lu_context *ctx,
307 struct lu_server_seq *seq,
308 struct ptlrpc_request *req)
310 int rep_buf_size[2] = { 0, };
311 struct req_capsule pill;
312 struct lu_range *out;
317 req_capsule_init(&pill, req, RCL_SERVER,
320 req_capsule_set(&pill, &RQF_SEQ_QUERY);
321 req_capsule_pack(&pill);
323 opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
325 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
327 CERROR("can't get range buffer\n");
328 GOTO(out_pill, rc= -EPROTO);
330 rc = seq_server_handle(seq, ctx, out, *opc);
332 CERROR("cannot unpack client request\n");
337 req_capsule_fini(&pill);
342 seq_req_handle(struct ptlrpc_request *req)
344 int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
345 const struct lu_context *ctx;
346 struct lu_site *site;
350 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
352 ctx = req->rq_svc_thread->t_ctx;
353 LASSERT(ctx != NULL);
354 LASSERT(ctx->lc_thread == req->rq_svc_thread);
355 if (req->rq_reqmsg->opc == SEQ_QUERY) {
356 if (req->rq_export != NULL) {
357 struct obd_device *obd;
359 obd = req->rq_export->exp_obd;
360 site = obd->obd_lu_dev->ld_site;
361 LASSERT(site != NULL);
363 rc = seq_req_handle0(ctx, site->ls_server_seq, req);
365 CERROR("Unconnected request\n");
366 req->rq_status = -ENOTCONN;
367 GOTO(out, rc = -ENOTCONN);
370 CERROR("Wrong opcode: %d\n",
371 req->rq_reqmsg->opc);
372 req->rq_status = -ENOTSUPP;
373 rc = ptlrpc_error(req);
379 target_send_reply(req, rc, fail);
385 seq_server_proc_init(struct lu_server_seq *seq)
390 seq->seq_proc_dir = lprocfs_register(seq->seq_name,
393 if (IS_ERR(seq->seq_proc_dir)) {
394 CERROR("LProcFS failed in seq-init\n");
395 rc = PTR_ERR(seq->seq_proc_dir);
399 seq->seq_proc_entry = lprocfs_register("services",
402 if (IS_ERR(seq->seq_proc_entry)) {
403 CERROR("LProcFS failed in seq-init\n");
404 rc = PTR_ERR(seq->seq_proc_entry);
408 rc = lprocfs_add_vars(seq->seq_proc_dir,
409 seq_server_proc_list, seq);
411 CERROR("can't init sequence manager "
412 "proc, rc %d\n", rc);
418 lprocfs_remove(seq->seq_proc_dir);
420 seq->seq_proc_dir = NULL;
421 seq->seq_proc_entry = NULL;
426 seq_server_proc_fini(struct lu_server_seq *seq)
429 if (seq->seq_proc_entry) {
430 lprocfs_remove(seq->seq_proc_entry);
431 seq->seq_proc_entry = NULL;
434 if (seq->seq_proc_dir) {
435 lprocfs_remove(seq->seq_proc_dir);
436 seq->seq_proc_dir = NULL;
443 seq_server_init(struct lu_server_seq *seq,
444 struct dt_device *dev,
446 const struct lu_context *ctx)
449 struct ptlrpc_service_conf seq_conf = {
450 .psc_nbufs = MDS_NBUFS,
451 .psc_bufsize = MDS_BUFSIZE,
452 .psc_max_req_size = MDS_MAXREQSIZE,
453 .psc_max_reply_size = MDS_MAXREPSIZE,
454 .psc_req_portal = MDS_SEQ_PORTAL,
455 .psc_rep_portal = MDC_REPLY_PORTAL,
456 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
457 .psc_num_threads = SEQ_NUM_THREADS
461 LASSERT(dev != NULL);
462 LASSERT(uuid != NULL);
466 sema_init(&seq->seq_sem, 1);
468 seq->seq_super_width = LUSTRE_SEQ_SUPER_WIDTH;
469 seq->seq_meta_width = LUSTRE_SEQ_META_WIDTH;
471 snprintf(seq->seq_name, sizeof(seq->seq_name),
472 "%s-%s", LUSTRE_SEQ_NAME, uuid);
474 seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
475 seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
477 lu_device_get(&seq->seq_dev->dd_lu_dev);
479 /* request backing store for saved sequence info */
480 rc = seq_server_read_state(seq, ctx);
481 if (rc == -ENODATA) {
482 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): no data on "
483 "disk found, waiting for controller assign\n");
485 CERROR("can't read sequence state, rc = %d\n",
491 rc = seq_server_proc_init(seq);
496 seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
501 if (seq->seq_service != NULL)
502 rc = ptlrpc_start_threads(NULL, seq->seq_service,
512 seq_server_proc_fini(seq);
514 seq_server_fini(seq, ctx);
516 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
521 EXPORT_SYMBOL(seq_server_init);
524 seq_server_fini(struct lu_server_seq *seq,
525 const struct lu_context *ctx)
529 if (seq->seq_service != NULL) {
530 ptlrpc_unregister_service(seq->seq_service);
531 seq->seq_service = NULL;
535 seq_server_proc_fini(seq);
538 if (seq->seq_dev != NULL) {
539 lu_device_put(&seq->seq_dev->dd_lu_dev);
543 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
547 EXPORT_SYMBOL(seq_server_fini);
549 static int fid_init(void)
555 static int fid_fini(void)
562 __init fid_mod_init(void)
565 /* init caches if any */
571 __exit fid_mod_exit(void)
573 /* free caches if any */
578 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
579 MODULE_DESCRIPTION("Lustre FID Module");
580 MODULE_LICENSE("GPL");
582 cfs_module(fid, "0.0.4", fid_mod_init, fid_mod_exit);