1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_request.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
50 static int seq_client_rpc(struct lu_client_seq *seq,
51 struct lu_range *range,
52 __u32 opc, const char *opcname)
54 int rc, size[3] = { sizeof(struct ptlrpc_body),
56 sizeof(struct lu_range) };
57 struct obd_export *exp = seq->lcs_exp;
58 struct ptlrpc_request *req;
59 struct lu_range *out, *in;
60 struct req_capsule pill;
64 req = ptlrpc_prep_req(class_exp2cliimp(exp),
71 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
72 req_capsule_set(&pill, &RQF_SEQ_QUERY);
74 /* init operation code */
75 op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
78 /* zero out input range, this is not recovery yet. */
79 in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
82 size[1] = sizeof(struct lu_range);
83 ptlrpc_req_set_repsize(req, 2, size);
85 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
86 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
87 SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
89 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
90 SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
93 rc = ptlrpc_queue_wait(req);
97 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
100 if (!range_is_sane(range)) {
101 CERROR("invalid seq range obtained from server: "
102 DRANGE"\n", PRANGE(range));
103 GOTO(out_req, rc = -EINVAL);
106 if (range_is_exhausted(range)) {
107 CERROR("seq range obtained from server is exhausted: "
108 DRANGE"]\n", PRANGE(range));
109 GOTO(out_req, rc = -EINVAL);
112 /* Save server out to request for recovery case. */
115 CDEBUG(D_INFO, "%s: allocated %s-sequence "
116 DRANGE"]\n", seq->lcs_name, opcname,
121 req_capsule_fini(&pill);
122 ptlrpc_req_finished(req);
126 /* request sequence-controller node to allocate new super-sequence. */
127 static int __seq_client_alloc_super(struct lu_client_seq *seq)
133 rc = seq_server_alloc_super(seq->lcs_srv, NULL,
138 rc = seq_client_rpc(seq, &seq->lcs_range,
139 SEQ_ALLOC_SUPER, "super");
146 int seq_client_alloc_super(struct lu_client_seq *seq)
152 rc = __seq_client_alloc_super(seq);
157 EXPORT_SYMBOL(seq_client_alloc_super);
159 /* request sequence-controller node to allocate new meta-sequence. */
160 static int __seq_client_alloc_meta(struct lu_client_seq *seq)
166 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
171 rc = seq_client_rpc(seq, &seq->lcs_range,
172 SEQ_ALLOC_META, "meta");
179 int seq_client_alloc_meta(struct lu_client_seq *seq)
185 rc = __seq_client_alloc_meta(seq);
190 EXPORT_SYMBOL(seq_client_alloc_meta);
192 /* allocate new sequence for client (llite or MDC are expected to use this) */
193 static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
198 LASSERT(range_is_sane(&seq->lcs_range));
200 /* if we still have free sequences in meta-sequence we allocate new seq
201 * from given range, if not - allocate new meta-sequence. */
202 if (range_space(&seq->lcs_range) == 0) {
203 rc = __seq_client_alloc_meta(seq);
205 CERROR("can't allocate new meta-sequence, "
211 LASSERT(range_space(&seq->lcs_range) > 0);
212 *seqnr = seq->lcs_range.lr_start;
213 seq->lcs_range.lr_start++;
215 CDEBUG(D_INFO, "%s: allocated sequence ["LPX64"]\n",
216 seq->lcs_name, *seqnr);
220 int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
226 rc = __seq_client_alloc_seq(seq, seqnr);
231 EXPORT_SYMBOL(seq_client_alloc_seq);
233 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
238 LASSERT(fid != NULL);
242 if (!fid_is_sane(&seq->lcs_fid) ||
243 fid_oid(&seq->lcs_fid) >= seq->lcs_width)
247 /* allocate new sequence for case client has no sequence at all
248 * or sequence is exhausted and should be switched. */
249 rc = __seq_client_alloc_seq(seq, &seqnr);
251 CERROR("can't allocate new sequence, "
257 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
258 seq->lcs_fid.f_seq = seqnr;
259 seq->lcs_fid.f_ver = 0;
261 /* inform caller that sequence switch is performed to allow it
262 * to setup FLD for it. */
265 seq->lcs_fid.f_oid++;
270 LASSERT(fid_is_sane(fid));
272 CDEBUG(D_INFO, "%s: allocated FID "DFID"\n",
273 seq->lcs_name, PFID(fid));
280 EXPORT_SYMBOL(seq_client_alloc_fid);
282 static void seq_client_proc_fini(struct lu_client_seq *seq);
285 static int seq_client_proc_init(struct lu_client_seq *seq)
290 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
294 if (IS_ERR(seq->lcs_proc_dir)) {
295 CERROR("LProcFS failed in seq-init\n");
296 rc = PTR_ERR(seq->lcs_proc_dir);
300 rc = lprocfs_add_vars(seq->lcs_proc_dir,
301 seq_client_proc_list, seq);
303 CERROR("can't init sequence manager "
304 "proc, rc %d\n", rc);
305 GOTO(out_cleanup, rc);
311 seq_client_proc_fini(seq);
315 static void seq_client_proc_fini(struct lu_client_seq *seq)
318 if (seq->lcs_proc_dir) {
319 if (!IS_ERR(seq->lcs_proc_dir))
320 lprocfs_remove(seq->lcs_proc_dir);
321 seq->lcs_proc_dir = NULL;
326 static int seq_client_proc_init(struct lu_client_seq *seq)
331 static void seq_client_proc_fini(struct lu_client_seq *seq)
337 int seq_client_init(struct lu_client_seq *seq,
338 struct obd_export *exp,
339 enum lu_cli_type type,
341 struct lu_server_seq *srv,
342 const struct lu_context *ctx)
347 LASSERT(seq != NULL);
348 LASSERT(prefix != NULL);
353 seq->lcs_type = type;
354 fid_zero(&seq->lcs_fid);
355 range_zero(&seq->lcs_range);
356 sema_init(&seq->lcs_sem, 1);
357 seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
360 LASSERT(seq->lcs_ctx != NULL);
361 LASSERT(seq->lcs_srv != NULL);
363 LASSERT(seq->lcs_exp != NULL);
364 seq->lcs_exp = class_export_get(seq->lcs_exp);
367 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
368 "%s-cli-%s", LUSTRE_SEQ_NAME, prefix);
370 rc = seq_client_proc_init(seq);
372 seq_client_fini(seq);
374 CDEBUG(D_INFO|D_WARNING,
375 "Client Sequence Manager\n");
378 EXPORT_SYMBOL(seq_client_init);
380 void seq_client_fini(struct lu_client_seq *seq)
384 seq_client_proc_fini(seq);
386 if (seq->lcs_exp != NULL) {
387 class_export_put(seq->lcs_exp);
393 CDEBUG(D_INFO|D_WARNING,
394 "Client Sequence Manager\n");
398 EXPORT_SYMBOL(seq_client_fini);