1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_fid.h>
47 #include "fid_internal.h"
49 /* client seq mgr interface */
51 seq_client_alloc_common(struct lu_client_seq *seq,
52 struct lu_range *seq_ran,
56 struct lu_range *range;
57 struct ptlrpc_request *req;
58 int ran_size = sizeof(*range);
59 int rc, size[] = {sizeof(*op), ran_size};
60 int repsize[] = {ran_size};
63 req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
64 LUSTRE_MDS_VERSION, SEQ_QUERY,
69 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
72 range = lustre_msg_buf(req->rq_reqmsg, 1, ran_size);
75 req->rq_replen = lustre_msg_size(1, repsize);
76 req->rq_request_portal = MDS_SEQ_PORTAL;
77 rc = ptlrpc_queue_wait(req);
81 range = lustre_swab_repbuf(req, 0, sizeof(*range),
82 lustre_swab_lu_range);
84 LASSERT(range != NULL);
87 ptlrpc_req_finished(req);
91 /* request sequence-controller node to allocate new super-sequence. */
93 seq_client_alloc_super(struct lu_client_seq *seq)
98 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
99 rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
102 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated super-sequence "
103 "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
104 seq->seq_cl_range.lr_end);
108 EXPORT_SYMBOL(seq_client_alloc_super);
110 /* request sequence-controller node to allocate new meta-sequence. */
112 seq_client_alloc_meta(struct lu_client_seq *seq)
117 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
118 rc = seq_client_alloc_common(seq, &seq->seq_cl_range,
121 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated meta-sequence "
122 "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
123 seq->seq_cl_range.lr_end);
127 EXPORT_SYMBOL(seq_client_alloc_meta);
129 /* allocate new sequence for client (llite or MDC are expected to use this) */
131 seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
138 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
139 LASSERT(range_is_sane(&seq->seq_cl_range));
141 /* if we still have free sequences in meta-sequence we allocate new seq
142 * from given range. */
143 if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
144 *seqnr = seq->seq_cl_range.lr_start;
145 seq->seq_cl_range.lr_start += 1;
148 /* meta-sequence is exhausted, request MDT to allocate new
149 * meta-sequence for us. */
150 rc = seq_client_alloc_meta(seq);
152 CERROR("can't allocate new meta-sequence, "
156 *seqnr = seq->seq_cl_range.lr_start;
157 seq->seq_cl_range.lr_start += 1;
162 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated sequence "
163 "["LPX64"]\n", *seqnr);
167 EXPORT_SYMBOL(seq_client_alloc_seq);
170 seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
175 LASSERT(fid != NULL);
176 LASSERT(fid_is_sane(&seq->seq_fid));
177 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
180 if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
182 seq->seq_fid.f_oid += 1;
187 rc = seq_client_alloc_seq(seq, &seqnr);
189 CERROR("can't allocate new sequence, "
193 seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
194 seq->seq_fid.f_seq = seqnr;
195 seq->seq_fid.f_ver = 0;
198 seq->seq_fid.f_oid += 1;
201 LASSERT(fid_is_sane(fid));
203 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
211 EXPORT_SYMBOL(seq_client_alloc_fid);
214 seq_client_init(struct lu_client_seq *seq,
215 struct obd_export *exp,
221 LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
222 LUSTRE_CLI_SEQ_SERVER));
224 seq->seq_flags = flags;
225 fid_zero(&seq->seq_fid);
226 sema_init(&seq->seq_sem, 1);
228 seq->seq_cl_range.lr_end = 0;
229 seq->seq_cl_range.lr_start = 0;
232 seq->seq_exp = class_export_get(exp);
234 if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
237 /* client (llite or MDC) init case, we need new sequence from
238 * MDT. This will allocate new meta-sequemce first, because seq
239 * range in init state and looks the same as exhausted. */
240 rc = seq_client_alloc_seq(seq, &seqnr);
242 CERROR("can't allocate new sequence, rc %d\n", rc);
245 seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
246 seq->seq_fid.f_seq = seqnr;
247 seq->seq_fid.f_ver = 0;
250 LASSERT(fid_is_sane(&seq->seq_fid));
252 /* check if this is controller node is trying to init client. */
254 /* MDT uses client seq manager to talk to sequence
255 * controller, and thus, we need super-sequence. */
256 rc = seq_client_alloc_super(seq);
265 seq_client_fini(seq);
267 CDEBUG(D_INFO, "Client Sequence Manager initialized\n");
270 EXPORT_SYMBOL(seq_client_init);
272 void seq_client_fini(struct lu_client_seq *seq)
275 if (seq->seq_exp != NULL) {
276 class_export_put(seq->seq_exp);
279 CDEBUG(D_INFO, "Client Sequence Manager finalized\n");
282 EXPORT_SYMBOL(seq_client_fini);
285 /* server side seq mgr stuff */
286 static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
287 LUSTRE_SEQ_SPACE_START,
288 LUSTRE_SEQ_SPACE_LIMIT
291 static const struct lu_range LUSTRE_SEQ_META_INIT = {
297 seq_server_write_state(struct lu_server_seq *seq,
298 const struct lu_context *ctx)
303 /* XXX: here should be calling struct dt_device methods to write
304 * sequence state to backing store. */
310 seq_server_read_state(struct lu_server_seq *seq,
311 const struct lu_context *ctx)
316 /* XXX: here should be calling struct dt_device methods to read the
317 * sequence state from backing store. */
323 seq_server_alloc_super(struct lu_server_seq *seq,
324 struct lu_range *range)
326 struct lu_range *ss_range = &seq->seq_ss_range;
330 if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
331 CWARN("super-sequence is going to exhauste soon. "
332 "Only can allocate "LPU64" sequences\n",
333 ss_range->lr_end - ss_range->lr_start);
335 ss_range->lr_start = ss_range->lr_end;
337 } else if (ss_range->lr_start >= ss_range->lr_end) {
338 CERROR("super-sequence is exhausted\n");
341 range->lr_start = ss_range->lr_start;
342 ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
343 range->lr_end = ss_range->lr_start;
348 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated super-sequence "
349 "["LPX64"-"LPX64"]\n", range->lr_start,
357 seq_server_alloc_meta(struct lu_server_seq *seq,
358 struct lu_range *range)
360 struct lu_range *ms_range = &seq->seq_ms_range;
364 LASSERT(range_is_sane(ms_range));
366 /* XXX: here should avoid cascading RPCs using kind of async
367 * preallocation when meta-sequence is close to exhausting. */
368 if (ms_range->lr_start == ms_range->lr_end) {
369 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
370 /* allocate new range of meta-sequences to allocate new
371 * meta-sequence from it. */
372 rc = seq_server_alloc_super(seq, ms_range);
374 /* request controller to allocate new super-sequence for
376 rc = seq_client_alloc_super(seq->seq_cli);
378 CERROR("can't allocate new super-sequence, "
383 /* saving new range into allocation space. */
384 *ms_range = seq->seq_cli->seq_cl_range;
387 LASSERT(ms_range->lr_start != 0);
388 LASSERT(ms_range->lr_end > ms_range->lr_start);
392 range->lr_start = ms_range->lr_start;
393 ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
394 range->lr_end = ms_range->lr_start;
397 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated meta-sequence "
398 "["LPX64"-"LPX64"]\n", range->lr_start,
406 seq_server_handle(struct lu_server_seq *seq,
407 const struct lu_context *ctx,
408 struct lu_range *range,
417 case SEQ_ALLOC_SUPER:
418 rc = seq_server_alloc_super(seq, range);
421 rc = seq_server_alloc_meta(seq, range);
431 rc = seq_server_write_state(seq, ctx);
433 CERROR("can't save state, rc = %d\n",
444 seq_req_handle0(const struct lu_context *ctx,
445 struct lu_server_seq *seq,
446 struct ptlrpc_request *req)
449 struct lu_range *out;
450 int size = sizeof(*in);
455 rc = lustre_pack_reply(req, 1, &size, NULL);
460 opt = lustre_swab_reqbuf(req, 0, sizeof(*opt),
461 lustre_swab_generic_32s);
463 in = lustre_swab_reqbuf(req, 1, sizeof(*in),
464 lustre_swab_lu_range);
466 out = lustre_msg_buf(req->rq_repmsg,
468 LASSERT(out != NULL);
471 rc = seq_server_handle(seq, ctx, out, *opt);
473 CERROR("Cannot unpack seq range\n");
476 CERROR("Cannot unpack option\n");
482 seq_req_handle(struct ptlrpc_request *req)
484 int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
485 const struct lu_context *ctx;
486 struct lu_site *site;
490 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
492 ctx = req->rq_svc_thread->t_ctx;
493 LASSERT(ctx != NULL);
494 LASSERT(ctx->lc_thread == req->rq_svc_thread);
495 if (req->rq_reqmsg->opc == SEQ_QUERY) {
496 if (req->rq_export != NULL) {
497 struct obd_device *obd;
499 obd = req->rq_export->exp_obd;
500 site = obd->obd_lu_dev->ld_site;
501 LASSERT(site != NULL);
503 rc = seq_req_handle0(ctx, site->ls_server_seq, req);
505 CERROR("Unconnected request\n");
506 req->rq_status = -ENOTCONN;
507 GOTO(out, rc = -ENOTCONN);
510 CERROR("Wrong opcode: %d\n",
511 req->rq_reqmsg->opc);
512 req->rq_status = -ENOTSUPP;
513 rc = ptlrpc_error(req);
519 target_send_reply(req, rc, fail);
524 seq_server_init(struct lu_server_seq *seq,
525 struct lu_client_seq *cli,
526 const struct lu_context *ctx,
527 struct dt_device *dev,
533 struct ptlrpc_service_conf seq_conf = {
534 .psc_nbufs = MDS_NBUFS,
535 .psc_bufsize = MDS_BUFSIZE,
536 .psc_max_req_size = MDS_MAXREQSIZE,
537 .psc_max_reply_size = MDS_MAXREPSIZE,
538 .psc_req_portal = MDS_SEQ_PORTAL,
539 .psc_rep_portal = MDC_REPLY_PORTAL,
540 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
541 .psc_num_threads = SEQ_NUM_THREADS
544 LASSERT(dev != NULL);
545 LASSERT(cli != NULL);
547 LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
548 LUSTRE_SRV_SEQ_REGULAR));
552 seq->seq_flags = flags;
553 sema_init(&seq->seq_sem, 1);
555 lu_device_get(&seq->seq_dev->dd_lu_dev);
557 /* request backing store for saved sequence info */
558 rc = seq_server_read_state(seq, ctx);
559 if (rc == -ENODATA) {
560 /* first run, no state on disk, init all seqs */
561 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
562 /* init super seq by start values on sequence-controller
564 seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
566 /* take super-seq from client seq mgr */
567 LASSERT(range_is_sane(&cli->seq_cl_range));
568 seq->seq_ss_range = cli->seq_cl_range;
571 /* init meta-sequence by start values and get ready for
572 * allocating it for clients. */
573 seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
575 /* save init seq to backing store. */
576 rc = seq_server_write_state(seq, ctx);
578 CERROR("can't write sequence state, "
583 CERROR("can't read sequence state, rc = %d\n",
588 seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
593 if (seq->seq_service != NULL)
594 rc = ptlrpc_start_threads(NULL, seq->seq_service,
603 seq_server_fini(seq, ctx);
605 CDEBUG(D_INFO, "Server Sequence Manager initialized\n");
608 EXPORT_SYMBOL(seq_server_init);
611 seq_server_fini(struct lu_server_seq *seq,
612 const struct lu_context *ctx)
616 if (seq->seq_service != NULL) {
617 ptlrpc_unregister_service(seq->seq_service);
618 seq->seq_service = NULL;
621 if (seq->seq_dev != NULL) {
622 rc = seq_server_write_state(seq, ctx);
624 CERROR("can't save sequence state, "
627 lu_device_put(&seq->seq_dev->dd_lu_dev);
631 CDEBUG(D_INFO, "Server Sequence Manager finalized\n");
633 EXPORT_SYMBOL(seq_server_fini);
635 static int fid_init(void)
638 CDEBUG(D_INFO, "Lustre Sequence Manager\n");
642 static int fid_fini(void)
649 __init fid_mod_init(void)
652 /* init caches if any */
658 __exit fid_mod_exit(void)
660 /* free caches if any */
665 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
666 MODULE_DESCRIPTION("Lustre FID Module");
667 MODULE_LICENSE("GPL");
669 cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);