Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_request.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 /* mdc RPC locks */
49 #include <lustre_mdc.h>
50 #include "fid_internal.h"
51
52 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
53                           struct lu_range *output, __u32 opc,
54                           const char *opcname)
55 {
56         struct obd_export     *exp = seq->lcs_exp;
57         struct ptlrpc_request *req;
58         struct lu_range       *out, *in;
59         __u32                 *op;
60         int                    rc;
61         ENTRY;
62
63         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
64                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
65         if (req == NULL)
66                 RETURN(-ENOMEM);
67
68         /* Init operation code */
69         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
70         *op = opc;
71
72         /* Zero out input range, this is not recovery yet. */
73         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
74         if (input != NULL)
75                 *in = *input;
76         else
77                 range_zero(in);
78
79         ptlrpc_request_set_replen(req);
80
81         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
82                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
83                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
84         } else {
85                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
86                         SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
87         }
88
89         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
90         rc = ptlrpc_queue_wait(req);
91         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
92
93         if (rc)
94                 GOTO(out_req, rc);
95
96         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
97         *output = *out;
98
99         if (!range_is_sane(output)) {
100                 CERROR("%s: Invalid range received from server: "
101                        DRANGE"\n", seq->lcs_name, PRANGE(output));
102                 GOTO(out_req, rc = -EINVAL);
103         }
104
105         if (range_is_exhausted(output)) {
106                 CERROR("%s: Range received from server is exhausted: "
107                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
108                 GOTO(out_req, rc = -EINVAL);
109         }
110         *in = *out;
111
112         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
113                seq->lcs_name, opcname, PRANGE(output));
114
115         EXIT;
116 out_req:
117         ptlrpc_req_finished(req);
118         return rc;
119 }
120
121 /* Request sequence-controller node to allocate new super-sequence. */
122 int seq_client_replay_super(struct lu_client_seq *seq,
123                             struct lu_range *range,
124                             const struct lu_env *env)
125 {
126         int rc;
127         ENTRY;
128
129         down(&seq->lcs_sem);
130
131 #ifdef __KERNEL__
132         if (seq->lcs_srv) {
133                 LASSERT(env != NULL);
134                 rc = seq_server_alloc_super(seq->lcs_srv, range,
135                                             &seq->lcs_space, env);
136         } else {
137 #endif
138                 rc = seq_client_rpc(seq, range, &seq->lcs_space,
139                                     SEQ_ALLOC_SUPER, "super");
140 #ifdef __KERNEL__
141         }
142 #endif
143         up(&seq->lcs_sem);
144         RETURN(rc);
145 }
146
147 /* Request sequence-controller node to allocate new super-sequence. */
148 int seq_client_alloc_super(struct lu_client_seq *seq,
149                            const struct lu_env *env)
150 {
151         ENTRY;
152         RETURN(seq_client_replay_super(seq, NULL, env));
153 }
154
155 /* Request sequence-controller node to allocate new meta-sequence. */
156 static int seq_client_alloc_meta(struct lu_client_seq *seq,
157                                  const struct lu_env *env)
158 {
159         int rc;
160         ENTRY;
161
162 #ifdef __KERNEL__
163         if (seq->lcs_srv) {
164                 LASSERT(env != NULL);
165                 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
166                                            &seq->lcs_space, env);
167         } else {
168 #endif
169                 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
170                                     SEQ_ALLOC_META, "meta");
171 #ifdef __KERNEL__
172         }
173 #endif
174         RETURN(rc);
175 }
176
177 /* Allocate new sequence for client. */
178 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
179 {
180         int rc;
181         ENTRY;
182
183         LASSERT(range_is_sane(&seq->lcs_space));
184
185         if (range_is_exhausted(&seq->lcs_space)) {
186                 rc = seq_client_alloc_meta(seq, NULL);
187                 if (rc) {
188                         CERROR("%s: Can't allocate new meta-sequence, "
189                                "rc %d\n", seq->lcs_name, rc);
190                         RETURN(rc);
191                 } else {
192                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
193                                seq->lcs_name, PRANGE(&seq->lcs_space));
194                 }
195         } else {
196                 rc = 0;
197         }
198
199         LASSERT(!range_is_exhausted(&seq->lcs_space));
200         *seqnr = seq->lcs_space.lr_start;
201         seq->lcs_space.lr_start += 1;
202
203         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
204                *seqnr);
205
206         RETURN(rc);
207 }
208
209 /* Allocate new fid on passed client @seq and save it to @fid. */
210 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
211 {
212         int rc;
213         ENTRY;
214
215         LASSERT(seq != NULL);
216         LASSERT(fid != NULL);
217
218         down(&seq->lcs_sem);
219
220         if (fid_is_zero(&seq->lcs_fid) ||
221             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
222         {
223                 seqno_t seqnr;
224
225                 rc = seq_client_alloc_seq(seq, &seqnr);
226                 if (rc) {
227                         CERROR("%s: Can't allocate new sequence, "
228                                "rc %d\n", seq->lcs_name, rc);
229                         up(&seq->lcs_sem);
230                         RETURN(rc);
231                 }
232
233                 CDEBUG(D_INFO, "%s: Switch to sequence "
234                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
235
236                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
237                 seq->lcs_fid.f_seq = seqnr;
238                 seq->lcs_fid.f_ver = 0;
239
240                 /*
241                  * Inform caller that sequence switch is performed to allow it
242                  * to setup FLD for it.
243                  */
244                 rc = 1;
245         } else {
246                 /* Just bump last allocated fid and return to caller. */
247                 seq->lcs_fid.f_oid += 1;
248                 rc = 0;
249         }
250
251         *fid = seq->lcs_fid;
252         up(&seq->lcs_sem);
253
254         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
255         RETURN(rc);
256 }
257 EXPORT_SYMBOL(seq_client_alloc_fid);
258
259 /*
260  * Finish the current sequence due to disconnect.
261  * See mdc_import_event()
262  */
263 void seq_client_flush(struct lu_client_seq *seq)
264 {
265         LASSERT(seq != NULL);
266         down(&seq->lcs_sem);
267         fid_zero(&seq->lcs_fid);
268         range_zero(&seq->lcs_space);
269         up(&seq->lcs_sem);
270 }
271 EXPORT_SYMBOL(seq_client_flush);
272
273 static void seq_client_proc_fini(struct lu_client_seq *seq);
274
275 #ifdef LPROCFS
276 static int seq_client_proc_init(struct lu_client_seq *seq)
277 {
278         int rc;
279         ENTRY;
280
281         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
282                                              seq_type_proc_dir,
283                                              NULL, NULL);
284
285         if (IS_ERR(seq->lcs_proc_dir)) {
286                 CERROR("%s: LProcFS failed in seq-init\n",
287                        seq->lcs_name);
288                 rc = PTR_ERR(seq->lcs_proc_dir);
289                 RETURN(rc);
290         }
291
292         rc = lprocfs_add_vars(seq->lcs_proc_dir,
293                               seq_client_proc_list, seq);
294         if (rc) {
295                 CERROR("%s: Can't init sequence manager "
296                        "proc, rc %d\n", seq->lcs_name, rc);
297                 GOTO(out_cleanup, rc);
298         }
299
300         RETURN(0);
301
302 out_cleanup:
303         seq_client_proc_fini(seq);
304         return rc;
305 }
306
307 static void seq_client_proc_fini(struct lu_client_seq *seq)
308 {
309         ENTRY;
310         if (seq->lcs_proc_dir) {
311                 if (!IS_ERR(seq->lcs_proc_dir))
312                         lprocfs_remove(&seq->lcs_proc_dir);
313                 seq->lcs_proc_dir = NULL;
314         }
315         EXIT;
316 }
317 #else
318 static int seq_client_proc_init(struct lu_client_seq *seq)
319 {
320         return 0;
321 }
322
323 static void seq_client_proc_fini(struct lu_client_seq *seq)
324 {
325         return;
326 }
327 #endif
328
329 int seq_client_init(struct lu_client_seq *seq,
330                     struct obd_export *exp,
331                     enum lu_cli_type type,
332                     const char *prefix,
333                     struct lu_server_seq *srv)
334 {
335         int rc;
336         ENTRY;
337
338         LASSERT(seq != NULL);
339         LASSERT(prefix != NULL);
340
341         seq->lcs_exp = exp;
342         seq->lcs_srv = srv;
343         seq->lcs_type = type;
344         sema_init(&seq->lcs_sem, 1);
345         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
346
347         /* Make sure that things are clear before work is started. */
348         seq_client_flush(seq);
349
350         if (exp == NULL) {
351                 LASSERT(seq->lcs_srv != NULL);
352         } else {
353                 LASSERT(seq->lcs_exp != NULL);
354                 seq->lcs_exp = class_export_get(seq->lcs_exp);
355         }
356
357         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
358                  "cli-%s", prefix);
359
360         rc = seq_client_proc_init(seq);
361         if (rc)
362                 seq_client_fini(seq);
363         RETURN(rc);
364 }
365 EXPORT_SYMBOL(seq_client_init);
366
367 void seq_client_fini(struct lu_client_seq *seq)
368 {
369         ENTRY;
370
371         seq_client_proc_fini(seq);
372
373         if (seq->lcs_exp != NULL) {
374                 class_export_put(seq->lcs_exp);
375                 seq->lcs_exp = NULL;
376         }
377
378         seq->lcs_srv = NULL;
379         EXIT;
380 }
381 EXPORT_SYMBOL(seq_client_fini);