Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_request.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 /* mdc RPC locks */
49 #include <lustre_mdc.h>
50 #include "fid_internal.h"
51
52 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
53                           struct lu_range *output, __u32 opc,
54                           const char *opcname)
55 {
56         struct obd_export     *exp = seq->lcs_exp;
57         struct ptlrpc_request *req;
58         struct lu_range       *out, *in;
59         __u32                 *op;
60         int                    rc;
61         ENTRY;
62
63         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
64                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
65         if (req == NULL)
66                 RETURN(-ENOMEM);
67
68         /* Init operation code */
69         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
70         *op = opc;
71
72         /* Zero out input range, this is not recovery yet. */
73         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
74         if (input != NULL)
75                 *in = *input;
76         else
77                 range_zero(in);
78
79         ptlrpc_request_set_replen(req);
80
81         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
82                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
83                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
84         } else {
85                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
86                         SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
87         }
88         ptlrpc_at_set_req_timeout(req);
89
90         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
91         rc = ptlrpc_queue_wait(req);
92         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
93
94         if (rc)
95                 GOTO(out_req, rc);
96
97         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
98         *output = *out;
99
100         if (!range_is_sane(output)) {
101                 CERROR("%s: Invalid range received from server: "
102                        DRANGE"\n", seq->lcs_name, PRANGE(output));
103                 GOTO(out_req, rc = -EINVAL);
104         }
105
106         if (range_is_exhausted(output)) {
107                 CERROR("%s: Range received from server is exhausted: "
108                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
109                 GOTO(out_req, rc = -EINVAL);
110         }
111         *in = *out;
112
113         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
114                seq->lcs_name, opcname, PRANGE(output));
115
116         EXIT;
117 out_req:
118         ptlrpc_req_finished(req);
119         return rc;
120 }
121
122 /* Request sequence-controller node to allocate new super-sequence. */
123 int seq_client_replay_super(struct lu_client_seq *seq,
124                             struct lu_range *range,
125                             const struct lu_env *env)
126 {
127         int rc;
128         ENTRY;
129
130         down(&seq->lcs_sem);
131
132 #ifdef __KERNEL__
133         if (seq->lcs_srv) {
134                 LASSERT(env != NULL);
135                 rc = seq_server_alloc_super(seq->lcs_srv, range,
136                                             &seq->lcs_space, env);
137         } else {
138 #endif
139                 rc = seq_client_rpc(seq, range, &seq->lcs_space,
140                                     SEQ_ALLOC_SUPER, "super");
141 #ifdef __KERNEL__
142         }
143 #endif
144         up(&seq->lcs_sem);
145         RETURN(rc);
146 }
147
148 /* Request sequence-controller node to allocate new super-sequence. */
149 int seq_client_alloc_super(struct lu_client_seq *seq,
150                            const struct lu_env *env)
151 {
152         ENTRY;
153         RETURN(seq_client_replay_super(seq, NULL, env));
154 }
155
156 /* Request sequence-controller node to allocate new meta-sequence. */
157 static int seq_client_alloc_meta(struct lu_client_seq *seq,
158                                  const struct lu_env *env)
159 {
160         int rc;
161         ENTRY;
162
163 #ifdef __KERNEL__
164         if (seq->lcs_srv) {
165                 LASSERT(env != NULL);
166                 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
167                                            &seq->lcs_space, env);
168         } else {
169 #endif
170                 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
171                                     SEQ_ALLOC_META, "meta");
172 #ifdef __KERNEL__
173         }
174 #endif
175         RETURN(rc);
176 }
177
178 /* Allocate new sequence for client. */
179 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
180 {
181         int rc;
182         ENTRY;
183
184         LASSERT(range_is_sane(&seq->lcs_space));
185
186         if (range_is_exhausted(&seq->lcs_space)) {
187                 rc = seq_client_alloc_meta(seq, NULL);
188                 if (rc) {
189                         CERROR("%s: Can't allocate new meta-sequence, "
190                                "rc %d\n", seq->lcs_name, rc);
191                         RETURN(rc);
192                 } else {
193                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
194                                seq->lcs_name, PRANGE(&seq->lcs_space));
195                 }
196         } else {
197                 rc = 0;
198         }
199
200         LASSERT(!range_is_exhausted(&seq->lcs_space));
201         *seqnr = seq->lcs_space.lr_start;
202         seq->lcs_space.lr_start += 1;
203
204         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
205                *seqnr);
206
207         RETURN(rc);
208 }
209
210 /* Allocate new fid on passed client @seq and save it to @fid. */
211 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
212 {
213         int rc;
214         ENTRY;
215
216         LASSERT(seq != NULL);
217         LASSERT(fid != NULL);
218
219         down(&seq->lcs_sem);
220
221         if (fid_is_zero(&seq->lcs_fid) ||
222             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
223         {
224                 seqno_t seqnr;
225
226                 rc = seq_client_alloc_seq(seq, &seqnr);
227                 if (rc) {
228                         CERROR("%s: Can't allocate new sequence, "
229                                "rc %d\n", seq->lcs_name, rc);
230                         up(&seq->lcs_sem);
231                         RETURN(rc);
232                 }
233
234                 CDEBUG(D_INFO, "%s: Switch to sequence "
235                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
236
237                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
238                 seq->lcs_fid.f_seq = seqnr;
239                 seq->lcs_fid.f_ver = 0;
240
241                 /*
242                  * Inform caller that sequence switch is performed to allow it
243                  * to setup FLD for it.
244                  */
245                 rc = 1;
246         } else {
247                 /* Just bump last allocated fid and return to caller. */
248                 seq->lcs_fid.f_oid += 1;
249                 rc = 0;
250         }
251
252         *fid = seq->lcs_fid;
253         up(&seq->lcs_sem);
254
255         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
256         RETURN(rc);
257 }
258 EXPORT_SYMBOL(seq_client_alloc_fid);
259
260 /*
261  * Finish the current sequence due to disconnect.
262  * See mdc_import_event()
263  */
264 void seq_client_flush(struct lu_client_seq *seq)
265 {
266         LASSERT(seq != NULL);
267         down(&seq->lcs_sem);
268         fid_zero(&seq->lcs_fid);
269         range_zero(&seq->lcs_space);
270         up(&seq->lcs_sem);
271 }
272 EXPORT_SYMBOL(seq_client_flush);
273
274 static void seq_client_proc_fini(struct lu_client_seq *seq);
275
276 #ifdef LPROCFS
277 static int seq_client_proc_init(struct lu_client_seq *seq)
278 {
279         int rc;
280         ENTRY;
281
282         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
283                                              seq_type_proc_dir,
284                                              NULL, NULL);
285
286         if (IS_ERR(seq->lcs_proc_dir)) {
287                 CERROR("%s: LProcFS failed in seq-init\n",
288                        seq->lcs_name);
289                 rc = PTR_ERR(seq->lcs_proc_dir);
290                 RETURN(rc);
291         }
292
293         rc = lprocfs_add_vars(seq->lcs_proc_dir,
294                               seq_client_proc_list, seq);
295         if (rc) {
296                 CERROR("%s: Can't init sequence manager "
297                        "proc, rc %d\n", seq->lcs_name, rc);
298                 GOTO(out_cleanup, rc);
299         }
300
301         RETURN(0);
302
303 out_cleanup:
304         seq_client_proc_fini(seq);
305         return rc;
306 }
307
308 static void seq_client_proc_fini(struct lu_client_seq *seq)
309 {
310         ENTRY;
311         if (seq->lcs_proc_dir) {
312                 if (!IS_ERR(seq->lcs_proc_dir))
313                         lprocfs_remove(&seq->lcs_proc_dir);
314                 seq->lcs_proc_dir = NULL;
315         }
316         EXIT;
317 }
318 #else
319 static int seq_client_proc_init(struct lu_client_seq *seq)
320 {
321         return 0;
322 }
323
324 static void seq_client_proc_fini(struct lu_client_seq *seq)
325 {
326         return;
327 }
328 #endif
329
330 int seq_client_init(struct lu_client_seq *seq,
331                     struct obd_export *exp,
332                     enum lu_cli_type type,
333                     const char *prefix,
334                     struct lu_server_seq *srv)
335 {
336         int rc;
337         ENTRY;
338
339         LASSERT(seq != NULL);
340         LASSERT(prefix != NULL);
341
342         seq->lcs_exp = exp;
343         seq->lcs_srv = srv;
344         seq->lcs_type = type;
345         sema_init(&seq->lcs_sem, 1);
346         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
347
348         /* Make sure that things are clear before work is started. */
349         seq_client_flush(seq);
350
351         if (exp == NULL) {
352                 LASSERT(seq->lcs_srv != NULL);
353         } else {
354                 LASSERT(seq->lcs_exp != NULL);
355                 seq->lcs_exp = class_export_get(seq->lcs_exp);
356         }
357
358         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
359                  "cli-%s", prefix);
360
361         rc = seq_client_proc_init(seq);
362         if (rc)
363                 seq_client_fini(seq);
364         RETURN(rc);
365 }
366 EXPORT_SYMBOL(seq_client_init);
367
368 void seq_client_fini(struct lu_client_seq *seq)
369 {
370         ENTRY;
371
372         seq_client_proc_fini(seq);
373
374         if (seq->lcs_exp != NULL) {
375                 class_export_put(seq->lcs_exp);
376                 seq->lcs_exp = NULL;
377         }
378
379         seq->lcs_srv = NULL;
380         EXIT;
381 }
382 EXPORT_SYMBOL(seq_client_fini);