1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_request.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
49 #include <lustre_mdc.h>
50 #include "fid_internal.h"
52 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
53 struct lu_range *output, __u32 opc,
56 struct obd_export *exp = seq->lcs_exp;
57 struct ptlrpc_request *req;
58 struct lu_range *out, *in;
63 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
64 LUSTRE_MDS_VERSION, SEQ_QUERY);
68 /* Init operation code */
69 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
72 /* Zero out input range, this is not recovery yet. */
73 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
79 ptlrpc_request_set_replen(req);
81 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
82 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
83 SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
85 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
86 SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
88 ptlrpc_at_set_req_timeout(req);
90 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
91 rc = ptlrpc_queue_wait(req);
92 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
97 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
100 if (!range_is_sane(output)) {
101 CERROR("%s: Invalid range received from server: "
102 DRANGE"\n", seq->lcs_name, PRANGE(output));
103 GOTO(out_req, rc = -EINVAL);
106 if (range_is_exhausted(output)) {
107 CERROR("%s: Range received from server is exhausted: "
108 DRANGE"]\n", seq->lcs_name, PRANGE(output));
109 GOTO(out_req, rc = -EINVAL);
113 CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
114 seq->lcs_name, opcname, PRANGE(output));
118 ptlrpc_req_finished(req);
122 /* Request sequence-controller node to allocate new super-sequence. */
123 int seq_client_replay_super(struct lu_client_seq *seq,
124 struct lu_range *range,
125 const struct lu_env *env)
134 LASSERT(env != NULL);
135 rc = seq_server_alloc_super(seq->lcs_srv, range,
136 &seq->lcs_space, env);
139 rc = seq_client_rpc(seq, range, &seq->lcs_space,
140 SEQ_ALLOC_SUPER, "super");
148 /* Request sequence-controller node to allocate new super-sequence. */
149 int seq_client_alloc_super(struct lu_client_seq *seq,
150 const struct lu_env *env)
153 RETURN(seq_client_replay_super(seq, NULL, env));
156 /* Request sequence-controller node to allocate new meta-sequence. */
157 static int seq_client_alloc_meta(struct lu_client_seq *seq,
158 const struct lu_env *env)
165 LASSERT(env != NULL);
166 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
167 &seq->lcs_space, env);
170 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
171 SEQ_ALLOC_META, "meta");
178 /* Allocate new sequence for client. */
179 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
184 LASSERT(range_is_sane(&seq->lcs_space));
186 if (range_is_exhausted(&seq->lcs_space)) {
187 rc = seq_client_alloc_meta(seq, NULL);
189 CERROR("%s: Can't allocate new meta-sequence, "
190 "rc %d\n", seq->lcs_name, rc);
193 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
194 seq->lcs_name, PRANGE(&seq->lcs_space));
200 LASSERT(!range_is_exhausted(&seq->lcs_space));
201 *seqnr = seq->lcs_space.lr_start;
202 seq->lcs_space.lr_start += 1;
204 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
210 /* Allocate new fid on passed client @seq and save it to @fid. */
211 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
216 LASSERT(seq != NULL);
217 LASSERT(fid != NULL);
221 if (fid_is_zero(&seq->lcs_fid) ||
222 fid_oid(&seq->lcs_fid) >= seq->lcs_width)
226 rc = seq_client_alloc_seq(seq, &seqnr);
228 CERROR("%s: Can't allocate new sequence, "
229 "rc %d\n", seq->lcs_name, rc);
234 CDEBUG(D_INFO, "%s: Switch to sequence "
235 "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
237 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
238 seq->lcs_fid.f_seq = seqnr;
239 seq->lcs_fid.f_ver = 0;
242 * Inform caller that sequence switch is performed to allow it
243 * to setup FLD for it.
247 /* Just bump last allocated fid and return to caller. */
248 seq->lcs_fid.f_oid += 1;
255 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
258 EXPORT_SYMBOL(seq_client_alloc_fid);
261 * Finish the current sequence due to disconnect.
262 * See mdc_import_event()
264 void seq_client_flush(struct lu_client_seq *seq)
266 LASSERT(seq != NULL);
268 fid_zero(&seq->lcs_fid);
269 range_zero(&seq->lcs_space);
272 EXPORT_SYMBOL(seq_client_flush);
274 static void seq_client_proc_fini(struct lu_client_seq *seq);
277 static int seq_client_proc_init(struct lu_client_seq *seq)
282 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
286 if (IS_ERR(seq->lcs_proc_dir)) {
287 CERROR("%s: LProcFS failed in seq-init\n",
289 rc = PTR_ERR(seq->lcs_proc_dir);
293 rc = lprocfs_add_vars(seq->lcs_proc_dir,
294 seq_client_proc_list, seq);
296 CERROR("%s: Can't init sequence manager "
297 "proc, rc %d\n", seq->lcs_name, rc);
298 GOTO(out_cleanup, rc);
304 seq_client_proc_fini(seq);
308 static void seq_client_proc_fini(struct lu_client_seq *seq)
311 if (seq->lcs_proc_dir) {
312 if (!IS_ERR(seq->lcs_proc_dir))
313 lprocfs_remove(&seq->lcs_proc_dir);
314 seq->lcs_proc_dir = NULL;
319 static int seq_client_proc_init(struct lu_client_seq *seq)
324 static void seq_client_proc_fini(struct lu_client_seq *seq)
330 int seq_client_init(struct lu_client_seq *seq,
331 struct obd_export *exp,
332 enum lu_cli_type type,
334 struct lu_server_seq *srv)
339 LASSERT(seq != NULL);
340 LASSERT(prefix != NULL);
344 seq->lcs_type = type;
345 sema_init(&seq->lcs_sem, 1);
346 seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
348 /* Make sure that things are clear before work is started. */
349 seq_client_flush(seq);
352 LASSERT(seq->lcs_srv != NULL);
354 LASSERT(seq->lcs_exp != NULL);
355 seq->lcs_exp = class_export_get(seq->lcs_exp);
358 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
361 rc = seq_client_proc_init(seq);
363 seq_client_fini(seq);
366 EXPORT_SYMBOL(seq_client_init);
368 void seq_client_fini(struct lu_client_seq *seq)
372 seq_client_proc_fini(seq);
374 if (seq->lcs_exp != NULL) {
375 class_export_put(seq->lcs_exp);
382 EXPORT_SYMBOL(seq_client_fini);