1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
50 /* client seq mgr interface */
52 seq_client_rpc(struct lu_client_seq *seq,
53 struct lu_range *range,
56 int repsize = sizeof(struct lu_range);
57 int rc, reqsize = sizeof(__u32);
58 struct ptlrpc_request *req;
63 req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
64 LUSTRE_MDS_VERSION, SEQ_QUERY,
69 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
72 req->rq_replen = lustre_msg_size(1, &repsize);
73 req->rq_request_portal = MDS_SEQ_PORTAL;
74 rc = ptlrpc_queue_wait(req);
78 ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
79 lustre_swab_lu_range);
82 CERROR("invalid range is returned\n");
83 GOTO(out_req, rc = -EPROTO);
88 ptlrpc_req_finished(req);
92 /* request sequence-controller node to allocate new super-sequence. */
94 seq_client_alloc_super(struct lu_client_seq *seq)
99 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
100 rc = seq_client_rpc(seq, &seq->seq_cl_range,
103 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated super-sequence "
104 "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
105 seq->seq_cl_range.lr_end);
109 EXPORT_SYMBOL(seq_client_alloc_super);
111 /* request sequence-controller node to allocate new meta-sequence. */
113 seq_client_alloc_meta(struct lu_client_seq *seq)
118 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
119 rc = seq_client_rpc(seq, &seq->seq_cl_range,
122 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated meta-sequence "
123 "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
124 seq->seq_cl_range.lr_end);
128 EXPORT_SYMBOL(seq_client_alloc_meta);
130 /* allocate new sequence for client (llite or MDC are expected to use this) */
132 seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
139 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
140 LASSERT(range_is_sane(&seq->seq_cl_range));
142 /* if we still have free sequences in meta-sequence we allocate new seq
143 * from given range. */
144 if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
145 *seqnr = seq->seq_cl_range.lr_start;
146 seq->seq_cl_range.lr_start += 1;
149 /* meta-sequence is exhausted, request MDT to allocate new
150 * meta-sequence for us. */
151 rc = seq_client_alloc_meta(seq);
153 CERROR("can't allocate new meta-sequence, "
157 *seqnr = seq->seq_cl_range.lr_start;
158 seq->seq_cl_range.lr_start += 1;
163 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated sequence "
164 "["LPX64"]\n", *seqnr);
168 EXPORT_SYMBOL(seq_client_alloc_seq);
171 seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
176 LASSERT(fid != NULL);
177 LASSERT(fid_is_sane(&seq->seq_fid));
178 LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
181 if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
183 seq->seq_fid.f_oid += 1;
188 rc = seq_client_alloc_seq(seq, &seqnr);
190 CERROR("can't allocate new sequence, "
194 seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
195 seq->seq_fid.f_seq = seqnr;
196 seq->seq_fid.f_ver = 0;
199 seq->seq_fid.f_oid += 1;
202 LASSERT(fid_is_sane(fid));
204 CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
212 EXPORT_SYMBOL(seq_client_alloc_fid);
215 seq_client_init(struct lu_client_seq *seq,
216 struct obd_export *exp,
222 LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
223 LUSTRE_CLI_SEQ_SERVER));
225 seq->seq_flags = flags;
226 fid_zero(&seq->seq_fid);
227 sema_init(&seq->seq_sem, 1);
229 seq->seq_cl_range.lr_end = 0;
230 seq->seq_cl_range.lr_start = 0;
233 seq->seq_exp = class_export_get(exp);
235 if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
238 /* client (llite or MDC) init case, we need new sequence from
239 * MDT. This will allocate new meta-sequemce first, because seq
240 * range in init state and looks the same as exhausted. */
241 rc = seq_client_alloc_seq(seq, &seqnr);
243 CERROR("can't allocate new sequence, rc %d\n", rc);
246 seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
247 seq->seq_fid.f_seq = seqnr;
248 seq->seq_fid.f_ver = 0;
251 LASSERT(fid_is_sane(&seq->seq_fid));
253 /* check if this is controller node is trying to init client. */
255 /* MDT uses client seq manager to talk to sequence
256 * controller, and thus, we need super-sequence. */
257 rc = seq_client_alloc_super(seq);
266 seq_client_fini(seq);
268 CDEBUG(D_INFO, "Client Sequence Manager initialized\n");
271 EXPORT_SYMBOL(seq_client_init);
273 void seq_client_fini(struct lu_client_seq *seq)
276 if (seq->seq_exp != NULL) {
277 class_export_put(seq->seq_exp);
280 CDEBUG(D_INFO, "Client Sequence Manager finalized\n");
283 EXPORT_SYMBOL(seq_client_fini);
286 /* server side seq mgr stuff */
287 static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
288 LUSTRE_SEQ_SPACE_START,
289 LUSTRE_SEQ_SPACE_LIMIT
292 static const struct lu_range LUSTRE_SEQ_META_INIT = {
298 seq_server_write_state(struct lu_server_seq *seq,
299 const struct lu_context *ctx)
304 /* XXX: here should be calling struct dt_device methods to write
305 * sequence state to backing store. */
311 seq_server_read_state(struct lu_server_seq *seq,
312 const struct lu_context *ctx)
317 /* XXX: here should be calling struct dt_device methods to read the
318 * sequence state from backing store. */
324 seq_server_alloc_super(struct lu_server_seq *seq,
325 struct lu_range *range)
327 struct lu_range *ss_range = &seq->seq_ss_range;
331 if (ss_range->lr_end - ss_range->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
332 CWARN("super-sequence is going to exhauste soon. "
333 "Only can allocate "LPU64" sequences\n",
334 ss_range->lr_end - ss_range->lr_start);
336 ss_range->lr_start = ss_range->lr_end;
338 } else if (ss_range->lr_start >= ss_range->lr_end) {
339 CERROR("super-sequence is exhausted\n");
342 range->lr_start = ss_range->lr_start;
343 ss_range->lr_start += LUSTRE_SEQ_SUPER_CHUNK;
344 range->lr_end = ss_range->lr_start;
349 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated super-sequence "
350 "["LPX64"-"LPX64"]\n", range->lr_start,
358 seq_server_alloc_meta(struct lu_server_seq *seq,
359 struct lu_range *range)
361 struct lu_range *ms_range = &seq->seq_ms_range;
365 LASSERT(range_is_sane(ms_range));
367 /* XXX: here should avoid cascading RPCs using kind of async
368 * preallocation when meta-sequence is close to exhausting. */
369 if (ms_range->lr_start == ms_range->lr_end) {
370 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
371 /* allocate new range of meta-sequences to allocate new
372 * meta-sequence from it. */
373 rc = seq_server_alloc_super(seq, ms_range);
375 /* request controller to allocate new super-sequence for
377 rc = seq_client_alloc_super(seq->seq_cli);
379 CERROR("can't allocate new super-sequence, "
384 /* saving new range into allocation space. */
385 *ms_range = seq->seq_cli->seq_cl_range;
388 LASSERT(ms_range->lr_start != 0);
389 LASSERT(ms_range->lr_end > ms_range->lr_start);
393 range->lr_start = ms_range->lr_start;
394 ms_range->lr_start += LUSTRE_SEQ_META_CHUNK;
395 range->lr_end = ms_range->lr_start;
398 CDEBUG(D_INFO, "SEQ-MGR(srv): allocated meta-sequence "
399 "["LPX64"-"LPX64"]\n", range->lr_start,
407 seq_server_handle(struct lu_server_seq *seq,
408 const struct lu_context *ctx,
409 struct lu_range *range,
418 case SEQ_ALLOC_SUPER:
419 rc = seq_server_alloc_super(seq, range);
422 rc = seq_server_alloc_meta(seq, range);
432 rc = seq_server_write_state(seq, ctx);
434 CERROR("can't save state, rc = %d\n",
445 seq_req_handle0(const struct lu_context *ctx,
446 struct lu_server_seq *seq,
447 struct ptlrpc_request *req)
449 int rep_buf_size[2] = { 0, };
450 struct req_capsule pill;
451 struct lu_range *out;
456 req_capsule_init(&pill, req, RCL_SERVER,
459 req_capsule_set(&pill, &RQF_SEQ_QUERY);
460 req_capsule_pack(&pill);
462 opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
464 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
466 CERROR("can't get range buffer\n");
467 GOTO(out_pill, rc= -EPROTO);
469 rc = seq_server_handle(seq, ctx, out, *opc);
471 CERROR("cannot unpack client request\n");
476 req_capsule_fini(&pill);
481 seq_req_handle(struct ptlrpc_request *req)
483 int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
484 const struct lu_context *ctx;
485 struct lu_site *site;
489 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
491 ctx = req->rq_svc_thread->t_ctx;
492 LASSERT(ctx != NULL);
493 LASSERT(ctx->lc_thread == req->rq_svc_thread);
494 if (req->rq_reqmsg->opc == SEQ_QUERY) {
495 if (req->rq_export != NULL) {
496 struct obd_device *obd;
498 obd = req->rq_export->exp_obd;
499 site = obd->obd_lu_dev->ld_site;
500 LASSERT(site != NULL);
502 rc = seq_req_handle0(ctx, site->ls_server_seq, req);
504 CERROR("Unconnected request\n");
505 req->rq_status = -ENOTCONN;
506 GOTO(out, rc = -ENOTCONN);
509 CERROR("Wrong opcode: %d\n",
510 req->rq_reqmsg->opc);
511 req->rq_status = -ENOTSUPP;
512 rc = ptlrpc_error(req);
518 target_send_reply(req, rc, fail);
523 seq_server_init(struct lu_server_seq *seq,
524 struct lu_client_seq *cli,
525 const struct lu_context *ctx,
526 struct dt_device *dev,
532 struct ptlrpc_service_conf seq_conf = {
533 .psc_nbufs = MDS_NBUFS,
534 .psc_bufsize = MDS_BUFSIZE,
535 .psc_max_req_size = MDS_MAXREQSIZE,
536 .psc_max_reply_size = MDS_MAXREPSIZE,
537 .psc_req_portal = MDS_SEQ_PORTAL,
538 .psc_rep_portal = MDC_REPLY_PORTAL,
539 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
540 .psc_num_threads = SEQ_NUM_THREADS
543 LASSERT(dev != NULL);
544 LASSERT(cli != NULL);
546 LASSERT(flags & (LUSTRE_SRV_SEQ_CONTROLLER |
547 LUSTRE_SRV_SEQ_REGULAR));
551 seq->seq_flags = flags;
552 sema_init(&seq->seq_sem, 1);
554 lu_device_get(&seq->seq_dev->dd_lu_dev);
556 /* request backing store for saved sequence info */
557 rc = seq_server_read_state(seq, ctx);
558 if (rc == -ENODATA) {
559 /* first run, no state on disk, init all seqs */
560 if (seq->seq_flags & LUSTRE_SRV_SEQ_CONTROLLER) {
561 /* init super seq by start values on sequence-controller
563 seq->seq_ss_range = LUSTRE_SEQ_SUPER_INIT;
565 /* take super-seq from client seq mgr */
566 LASSERT(range_is_sane(&cli->seq_cl_range));
567 seq->seq_ss_range = cli->seq_cl_range;
570 /* init meta-sequence by start values and get ready for
571 * allocating it for clients. */
572 seq->seq_ms_range = LUSTRE_SEQ_META_INIT;
574 /* save init seq to backing store. */
575 rc = seq_server_write_state(seq, ctx);
577 CERROR("can't write sequence state, "
582 CERROR("can't read sequence state, rc = %d\n",
587 seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
592 if (seq->seq_service != NULL)
593 rc = ptlrpc_start_threads(NULL, seq->seq_service,
602 seq_server_fini(seq, ctx);
604 CDEBUG(D_INFO, "Server Sequence Manager initialized\n");
607 EXPORT_SYMBOL(seq_server_init);
610 seq_server_fini(struct lu_server_seq *seq,
611 const struct lu_context *ctx)
615 if (seq->seq_service != NULL) {
616 ptlrpc_unregister_service(seq->seq_service);
617 seq->seq_service = NULL;
620 if (seq->seq_dev != NULL) {
621 rc = seq_server_write_state(seq, ctx);
623 CERROR("can't save sequence state, "
626 lu_device_put(&seq->seq_dev->dd_lu_dev);
630 CDEBUG(D_INFO, "Server Sequence Manager finalized\n");
632 EXPORT_SYMBOL(seq_server_fini);
634 static int fid_init(void)
637 CDEBUG(D_INFO, "Lustre Sequence Manager\n");
641 static int fid_fini(void)
648 __init fid_mod_init(void)
651 /* init caches if any */
657 __exit fid_mod_exit(void)
659 /* free caches if any */
664 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
665 MODULE_DESCRIPTION("Lustre FID Module");
666 MODULE_LICENSE("GPL");
668 cfs_module(fid, "0.0.3", fid_mod_init, fid_mod_exit);