1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_handler.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
51 /* sequence space, starts from 0x400 to have first 0x400 sequences used for
52 * special purposes. */
53 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
57 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
59 /* zero range, used for init and other purposes */
60 const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
64 EXPORT_SYMBOL(LUSTRE_SEQ_ZERO_RANGE);
67 seq_server_write_state(struct lu_server_seq *seq,
68 const struct lu_context *ctx)
73 /* XXX: here should be calling struct dt_device methods to write
74 * sequence state to backing store. */
80 seq_server_read_state(struct lu_server_seq *seq,
81 const struct lu_context *ctx)
86 /* XXX: here should be calling struct dt_device methods to read the
87 * sequence state from backing store. */
92 /* on controller node, allocate new super sequence for regular sequnece
95 __seq_server_alloc_super(struct lu_server_seq *seq,
96 struct lu_range *range,
97 const struct lu_context *ctx)
99 struct lu_range *space = &seq->seq_space;
103 LASSERT(range_is_sane(space));
105 if (range_space(space) < LUSTRE_SEQ_SUPER_WIDTH) {
106 CWARN("sequences space is going to exhauste soon. "
107 "Only can allocate "LPU64" sequences\n",
108 space->lr_end - space->lr_start);
110 space->lr_start = space->lr_end;
112 } else if (range_is_exhausted(space)) {
113 CERROR("sequences space is exhausted\n");
116 range_alloc(range, space, LUSTRE_SEQ_SUPER_WIDTH);
120 rc = seq_server_write_state(seq, ctx);
122 CERROR("can't save state, rc = %d\n",
127 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated super-sequence "
128 "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
135 seq_server_alloc_super(struct lu_server_seq *seq,
136 struct lu_range *range,
137 const struct lu_context *ctx)
143 rc = __seq_server_alloc_super(seq, range, ctx);
150 __seq_server_alloc_meta(struct lu_server_seq *seq,
151 struct lu_range *range,
152 const struct lu_context *ctx)
154 struct lu_range *super = &seq->seq_super;
158 LASSERT(range_is_sane(super));
160 /* XXX: here we should avoid cascading RPCs using kind of async
161 * preallocation when meta-sequence is close to exhausting. */
162 if (range_is_exhausted(super)) {
164 CERROR("no seq-controller client is setup\n");
168 /* allocate new super-sequence. */
170 rc = seq_client_alloc_super(seq->seq_cli);
173 CERROR("can't allocate new super-sequence, "
178 /* saving new range into allocation space. */
179 *super = seq->seq_cli->seq_range;
180 LASSERT(range_is_sane(super));
182 range_alloc(range, super, LUSTRE_SEQ_META_WIDTH);
184 rc = seq_server_write_state(seq, ctx);
186 CERROR("can't save state, rc = %d\n",
191 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): allocated meta-sequence "
192 "["LPX64"-"LPX64"]\n", range->lr_start, range->lr_end);
199 seq_server_alloc_meta(struct lu_server_seq *seq,
200 struct lu_range *range,
201 const struct lu_context *ctx)
207 rc = __seq_server_alloc_meta(seq, range, ctx);
214 seq_server_handle(struct lu_server_seq *seq,
215 const struct lu_context *ctx,
216 struct lu_range *range,
223 case SEQ_ALLOC_SUPER:
224 rc = seq_server_alloc_super(seq, range, ctx);
227 rc = seq_server_alloc_meta(seq, range, ctx);
238 seq_req_handle0(const struct lu_context *ctx,
239 struct lu_server_seq *seq,
240 struct ptlrpc_request *req)
242 int rep_buf_size[2] = { 0, };
243 struct req_capsule pill;
244 struct lu_range *out;
249 req_capsule_init(&pill, req, RCL_SERVER,
252 req_capsule_set(&pill, &RQF_SEQ_QUERY);
253 req_capsule_pack(&pill);
255 opc = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
257 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
259 CERROR("can't get range buffer\n");
260 GOTO(out_pill, rc= -EPROTO);
262 rc = seq_server_handle(seq, ctx, out, *opc);
264 CERROR("cannot unpack client request\n");
269 req_capsule_fini(&pill);
274 seq_req_handle(struct ptlrpc_request *req)
276 int fail = OBD_FAIL_SEQ_ALL_REPLY_NET;
277 const struct lu_context *ctx;
278 struct lu_site *site;
282 OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
284 ctx = req->rq_svc_thread->t_ctx;
285 LASSERT(ctx != NULL);
286 LASSERT(ctx->lc_thread == req->rq_svc_thread);
287 if (req->rq_reqmsg->opc == SEQ_QUERY) {
288 if (req->rq_export != NULL) {
289 struct obd_device *obd;
291 obd = req->rq_export->exp_obd;
292 site = obd->obd_lu_dev->ld_site;
293 LASSERT(site != NULL);
295 rc = seq_req_handle0(ctx, site->ls_server_seq, req);
297 CERROR("Unconnected request\n");
298 req->rq_status = -ENOTCONN;
299 GOTO(out, rc = -ENOTCONN);
302 CERROR("Wrong opcode: %d\n",
303 req->rq_reqmsg->opc);
304 req->rq_status = -ENOTSUPP;
305 rc = ptlrpc_error(req);
311 target_send_reply(req, rc, fail);
315 /* assigns client to sequence controller node */
317 seq_server_controller(struct lu_server_seq *seq,
318 struct lu_client_seq *cli,
319 const struct lu_context *ctx)
324 LASSERT(cli != NULL);
327 CERROR("SEQ-MGR(srv): sequence-controller "
328 "is already assigned\n");
332 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): assign "
333 "sequence controller client %s\n",
334 cli->seq_exp->exp_client_uuid.uuid);
338 /* assign controller */
341 /* get new range from controller only if super-sequence is not yet
342 * initialized from backing store or something else. */
343 if (range_is_zero(&seq->seq_super)) {
344 /* release sema to avoid deadlock for case we're asking our
347 rc = seq_client_alloc_super(cli);
351 CERROR("can't allocate super-sequence, "
356 /* take super-seq from client seq mgr */
357 LASSERT(range_is_sane(&cli->seq_range));
359 seq->seq_super = cli->seq_range;
361 /* save init seq to backing store. */
362 rc = seq_server_write_state(seq, ctx);
364 CERROR("can't write sequence state, "
371 EXPORT_SYMBOL(seq_server_controller);
374 seq_server_init(struct lu_server_seq *seq,
375 struct dt_device *dev, int flags,
376 const struct lu_context *ctx)
379 struct ptlrpc_service_conf seq_conf = {
380 .psc_nbufs = MDS_NBUFS,
381 .psc_bufsize = MDS_BUFSIZE,
382 .psc_max_req_size = MDS_MAXREQSIZE,
383 .psc_max_reply_size = MDS_MAXREPSIZE,
384 .psc_req_portal = MDS_SEQ_PORTAL,
385 .psc_rep_portal = MDC_REPLY_PORTAL,
386 .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
387 .psc_num_threads = SEQ_NUM_THREADS
391 LASSERT(dev != NULL);
395 seq->seq_flags = flags;
396 sema_init(&seq->seq_sem, 1);
398 seq->seq_space = LUSTRE_SEQ_SPACE_RANGE;
399 seq->seq_super = LUSTRE_SEQ_ZERO_RANGE;
401 lu_device_get(&seq->seq_dev->dd_lu_dev);
403 /* request backing store for saved sequence info */
404 rc = seq_server_read_state(seq, ctx);
405 if (rc == -ENODATA) {
406 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(srv): no data on "
407 "disk found, waiting for controller assign\n");
409 CERROR("can't read sequence state, rc = %d\n",
414 seq->seq_service = ptlrpc_init_svc_conf(&seq_conf,
419 if (seq->seq_service != NULL)
420 rc = ptlrpc_start_threads(NULL, seq->seq_service,
429 seq_server_fini(seq, ctx);
431 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
435 EXPORT_SYMBOL(seq_server_init);
438 seq_server_fini(struct lu_server_seq *seq,
439 const struct lu_context *ctx)
443 if (seq->seq_service != NULL) {
444 ptlrpc_unregister_service(seq->seq_service);
445 seq->seq_service = NULL;
448 if (seq->seq_dev != NULL) {
449 lu_device_put(&seq->seq_dev->dd_lu_dev);
453 CDEBUG(D_INFO|D_WARNING, "Server Sequence "
457 EXPORT_SYMBOL(seq_server_fini);
459 static int fid_init(void)
465 static int fid_fini(void)
472 __init fid_mod_init(void)
475 /* init caches if any */
481 __exit fid_mod_exit(void)
483 /* free caches if any */
488 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
489 MODULE_DESCRIPTION("Lustre FID Module");
490 MODULE_LICENSE("GPL");
492 cfs_module(fid, "0.0.4", fid_mod_init, fid_mod_exit);