1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_request.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
49 #include <lustre_mdc.h>
50 #include "fid_internal.h"
52 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
53 struct lu_range *output, __u32 opc,
56 struct obd_export *exp = seq->lcs_exp;
57 struct ptlrpc_request *req;
58 struct lu_range *out, *in;
63 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
64 LUSTRE_MDS_VERSION, SEQ_QUERY);
68 /* Init operation code */
69 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
72 /* Zero out input range, this is not recovery yet. */
73 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
79 ptlrpc_request_set_replen(req);
81 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
82 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
83 SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
85 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
86 SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
89 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
90 rc = ptlrpc_queue_wait(req);
91 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
96 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
99 if (!range_is_sane(output)) {
100 CERROR("%s: Invalid range received from server: "
101 DRANGE"\n", seq->lcs_name, PRANGE(output));
102 GOTO(out_req, rc = -EINVAL);
105 if (range_is_exhausted(output)) {
106 CERROR("%s: Range received from server is exhausted: "
107 DRANGE"]\n", seq->lcs_name, PRANGE(output));
108 GOTO(out_req, rc = -EINVAL);
112 CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
113 seq->lcs_name, opcname, PRANGE(output));
117 ptlrpc_req_finished(req);
121 /* Request sequence-controller node to allocate new super-sequence. */
122 int seq_client_replay_super(struct lu_client_seq *seq,
123 struct lu_range *range,
124 const struct lu_env *env)
133 LASSERT(env != NULL);
134 rc = seq_server_alloc_super(seq->lcs_srv, range,
135 &seq->lcs_space, env);
138 rc = seq_client_rpc(seq, range, &seq->lcs_space,
139 SEQ_ALLOC_SUPER, "super");
147 /* Request sequence-controller node to allocate new super-sequence. */
148 int seq_client_alloc_super(struct lu_client_seq *seq,
149 const struct lu_env *env)
152 RETURN(seq_client_replay_super(seq, NULL, env));
155 /* Request sequence-controller node to allocate new meta-sequence. */
156 static int seq_client_alloc_meta(struct lu_client_seq *seq,
157 const struct lu_env *env)
164 LASSERT(env != NULL);
165 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
166 &seq->lcs_space, env);
169 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
170 SEQ_ALLOC_META, "meta");
177 /* Allocate new sequence for client. */
178 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
183 LASSERT(range_is_sane(&seq->lcs_space));
185 if (range_is_exhausted(&seq->lcs_space)) {
186 rc = seq_client_alloc_meta(seq, NULL);
188 CERROR("%s: Can't allocate new meta-sequence, "
189 "rc %d\n", seq->lcs_name, rc);
192 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
193 seq->lcs_name, PRANGE(&seq->lcs_space));
199 LASSERT(!range_is_exhausted(&seq->lcs_space));
200 *seqnr = seq->lcs_space.lr_start;
201 seq->lcs_space.lr_start += 1;
203 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
209 /* Allocate new fid on passed client @seq and save it to @fid. */
210 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
215 LASSERT(seq != NULL);
216 LASSERT(fid != NULL);
220 if (fid_is_zero(&seq->lcs_fid) ||
221 fid_oid(&seq->lcs_fid) >= seq->lcs_width)
225 rc = seq_client_alloc_seq(seq, &seqnr);
227 CERROR("%s: Can't allocate new sequence, "
228 "rc %d\n", seq->lcs_name, rc);
233 CDEBUG(D_INFO, "%s: Switch to sequence "
234 "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
236 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
237 seq->lcs_fid.f_seq = seqnr;
238 seq->lcs_fid.f_ver = 0;
241 * Inform caller that sequence switch is performed to allow it
242 * to setup FLD for it.
246 /* Just bump last allocated fid and return to caller. */
247 seq->lcs_fid.f_oid += 1;
254 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
257 EXPORT_SYMBOL(seq_client_alloc_fid);
260 * Finish the current sequence due to disconnect.
261 * See mdc_import_event()
263 void seq_client_flush(struct lu_client_seq *seq)
265 LASSERT(seq != NULL);
267 fid_zero(&seq->lcs_fid);
268 range_zero(&seq->lcs_space);
271 EXPORT_SYMBOL(seq_client_flush);
273 static void seq_client_proc_fini(struct lu_client_seq *seq);
276 static int seq_client_proc_init(struct lu_client_seq *seq)
281 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
285 if (IS_ERR(seq->lcs_proc_dir)) {
286 CERROR("%s: LProcFS failed in seq-init\n",
288 rc = PTR_ERR(seq->lcs_proc_dir);
292 rc = lprocfs_add_vars(seq->lcs_proc_dir,
293 seq_client_proc_list, seq);
295 CERROR("%s: Can't init sequence manager "
296 "proc, rc %d\n", seq->lcs_name, rc);
297 GOTO(out_cleanup, rc);
303 seq_client_proc_fini(seq);
307 static void seq_client_proc_fini(struct lu_client_seq *seq)
310 if (seq->lcs_proc_dir) {
311 if (!IS_ERR(seq->lcs_proc_dir))
312 lprocfs_remove(&seq->lcs_proc_dir);
313 seq->lcs_proc_dir = NULL;
318 static int seq_client_proc_init(struct lu_client_seq *seq)
323 static void seq_client_proc_fini(struct lu_client_seq *seq)
329 int seq_client_init(struct lu_client_seq *seq,
330 struct obd_export *exp,
331 enum lu_cli_type type,
333 struct lu_server_seq *srv)
338 LASSERT(seq != NULL);
339 LASSERT(prefix != NULL);
343 seq->lcs_type = type;
344 sema_init(&seq->lcs_sem, 1);
345 seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
347 /* Make sure that things are clear before work is started. */
348 seq_client_flush(seq);
351 LASSERT(seq->lcs_srv != NULL);
353 LASSERT(seq->lcs_exp != NULL);
354 seq->lcs_exp = class_export_get(seq->lcs_exp);
357 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
360 rc = seq_client_proc_init(seq);
362 seq_client_fini(seq);
365 EXPORT_SYMBOL(seq_client_init);
367 void seq_client_fini(struct lu_client_seq *seq)
371 seq_client_proc_fini(seq);
373 if (seq->lcs_exp != NULL) {
374 class_export_put(seq->lcs_exp);
381 EXPORT_SYMBOL(seq_client_fini);