Whamcloud - gitweb
LU-1303 lod: transfer default striping from parent/fs
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fid/fid_request.c
37  *
38  * Lustre Sequence Manager
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FID
44
45 #ifdef __KERNEL__
46 # include <libcfs/libcfs.h>
47 # include <linux/module.h>
48 #else /* __KERNEL__ */
49 # include <liblustre.h>
50 #endif
51
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <dt_object.h>
55 #include <md_object.h>
56 #include <obd_support.h>
57 #include <lustre_req_layout.h>
58 #include <lustre_fid.h>
59 /* mdc RPC locks */
60 #include <lustre_mdc.h>
61 #include "fid_internal.h"
62
63 static int seq_client_rpc(struct lu_client_seq *seq,
64                           struct lu_seq_range *output, __u32 opc,
65                           const char *opcname)
66 {
67         struct obd_export     *exp = seq->lcs_exp;
68         struct ptlrpc_request *req;
69         struct lu_seq_range   *out, *in;
70         __u32                 *op;
71         unsigned int           debug_mask;
72         int                    rc;
73         ENTRY;
74
75         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
76                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
77         if (req == NULL)
78                 RETURN(-ENOMEM);
79
80         /* Init operation code */
81         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
82         *op = opc;
83
84         /* Zero out input range, this is not recovery yet. */
85         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
86         range_init(in);
87
88         ptlrpc_request_set_replen(req);
89
90         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
91                 req->rq_request_portal = SEQ_METADATA_PORTAL;
92                 in->lsr_flags = LU_SEQ_RANGE_MDT;
93         } else {
94                 LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA,
95                          "unknown lcs_type %u\n", seq->lcs_type);
96                 req->rq_request_portal = SEQ_DATA_PORTAL;
97                 in->lsr_flags = LU_SEQ_RANGE_OST;
98         }
99
100         if (opc == SEQ_ALLOC_SUPER) {
101                 /* Update index field of *in, it is required for
102                  * FLD update on super sequence allocator node. */
103                 in->lsr_index = seq->lcs_space.lsr_index;
104                 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
105                 debug_mask = D_CONSOLE;
106         } else {
107                 debug_mask = D_INFO;
108                 LASSERTF(opc == SEQ_ALLOC_META,
109                          "unknown opcode %u\n, opc", opc);
110         }
111
112         ptlrpc_at_set_req_timeout(req);
113
114         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
115         rc = ptlrpc_queue_wait(req);
116         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
117
118         if (rc)
119                 GOTO(out_req, rc);
120
121         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
122         *output = *out;
123
124         if (!range_is_sane(output)) {
125                 CERROR("%s: Invalid range received from server: "
126                        DRANGE"\n", seq->lcs_name, PRANGE(output));
127                 GOTO(out_req, rc = -EINVAL);
128         }
129
130         if (range_is_exhausted(output)) {
131                 CERROR("%s: Range received from server is exhausted: "
132                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
133                 GOTO(out_req, rc = -EINVAL);
134         }
135
136         CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n",
137                      seq->lcs_name, opcname, PRANGE(output));
138
139         EXIT;
140 out_req:
141         ptlrpc_req_finished(req);
142         return rc;
143 }
144
145 /* Request sequence-controller node to allocate new super-sequence. */
146 int seq_client_alloc_super(struct lu_client_seq *seq,
147                            const struct lu_env *env)
148 {
149         int rc;
150         ENTRY;
151
152         cfs_mutex_lock(&seq->lcs_mutex);
153
154 #ifdef __KERNEL__
155         if (seq->lcs_srv) {
156                 LASSERT(env != NULL);
157                 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
158                                             env);
159         } else {
160 #endif
161                 rc = seq_client_rpc(seq, &seq->lcs_space,
162                                     SEQ_ALLOC_SUPER, "super");
163 #ifdef __KERNEL__
164         }
165 #endif
166         cfs_mutex_unlock(&seq->lcs_mutex);
167         RETURN(rc);
168 }
169
170 /* Request sequence-controller node to allocate new meta-sequence. */
171 static int seq_client_alloc_meta(const struct lu_env *env,
172                                  struct lu_client_seq *seq)
173 {
174         int rc;
175         ENTRY;
176
177 #ifdef __KERNEL__
178         if (seq->lcs_srv) {
179                 LASSERT(env != NULL);
180                 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
181         } else {
182 #endif
183                 rc = seq_client_rpc(seq, &seq->lcs_space,
184                                     SEQ_ALLOC_META, "meta");
185 #ifdef __KERNEL__
186         }
187 #endif
188         RETURN(rc);
189 }
190
191 /* Allocate new sequence for client. */
192 static int seq_client_alloc_seq(const struct lu_env *env,
193                                 struct lu_client_seq *seq, seqno_t *seqnr)
194 {
195         int rc;
196         ENTRY;
197
198         LASSERT(range_is_sane(&seq->lcs_space));
199
200         if (range_is_exhausted(&seq->lcs_space)) {
201                 rc = seq_client_alloc_meta(env, seq);
202                 if (rc) {
203                         CERROR("%s: Can't allocate new meta-sequence,"
204                                "rc %d\n", seq->lcs_name, rc);
205                         RETURN(rc);
206                 } else {
207                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
208                                seq->lcs_name, PRANGE(&seq->lcs_space));
209                 }
210         } else {
211                 rc = 0;
212         }
213
214         LASSERT(!range_is_exhausted(&seq->lcs_space));
215         *seqnr = seq->lcs_space.lsr_start;
216         seq->lcs_space.lsr_start += 1;
217
218         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
219                *seqnr);
220
221         RETURN(rc);
222 }
223
224 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
225                               cfs_waitlink_t *link)
226 {
227         if (seq->lcs_update) {
228                 cfs_waitq_add(&seq->lcs_waitq, link);
229                 cfs_set_current_state(CFS_TASK_UNINT);
230                 cfs_mutex_unlock(&seq->lcs_mutex);
231
232                 cfs_waitq_wait(link, CFS_TASK_UNINT);
233
234                 cfs_mutex_lock(&seq->lcs_mutex);
235                 cfs_waitq_del(&seq->lcs_waitq, link);
236                 cfs_set_current_state(CFS_TASK_RUNNING);
237                 return -EAGAIN;
238         }
239         ++seq->lcs_update;
240         cfs_mutex_unlock(&seq->lcs_mutex);
241         return 0;
242 }
243
244 static void seq_fid_alloc_fini(struct lu_client_seq *seq)
245 {
246         LASSERT(seq->lcs_update == 1);
247         cfs_mutex_lock(&seq->lcs_mutex);
248         --seq->lcs_update;
249         cfs_waitq_signal(&seq->lcs_waitq);
250 }
251
252 /* Allocate the whole seq to the caller*/
253 int seq_client_get_seq(const struct lu_env *env,
254                        struct lu_client_seq *seq, seqno_t *seqnr)
255 {
256         cfs_waitlink_t link;
257         int rc;
258
259         LASSERT(seqnr != NULL);
260         cfs_mutex_lock(&seq->lcs_mutex);
261         cfs_waitlink_init(&link);
262
263         while (1) {
264                 rc = seq_fid_alloc_prep(seq, &link);
265                 if (rc == 0)
266                         break;
267         }
268
269         rc = seq_client_alloc_seq(env, seq, seqnr);
270         if (rc) {
271                 CERROR("%s: Can't allocate new sequence, "
272                        "rc %d\n", seq->lcs_name, rc);
273                 seq_fid_alloc_fini(seq);
274                 cfs_mutex_unlock(&seq->lcs_mutex);
275                 return rc;
276         }
277
278         CDEBUG(D_INFO, "%s: allocate sequence "
279                "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
280
281         /*Since the caller require the whole seq,
282          *so marked this seq to be used*/
283         seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
284         seq->lcs_fid.f_seq = *seqnr;
285         seq->lcs_fid.f_ver = 0;
286
287         /*
288          * Inform caller that sequence switch is performed to allow it
289          * to setup FLD for it.
290          */
291         seq_fid_alloc_fini(seq);
292         cfs_mutex_unlock(&seq->lcs_mutex);
293
294         return rc;
295 }
296 EXPORT_SYMBOL(seq_client_get_seq);
297
298 /* Allocate new fid on passed client @seq and save it to @fid. */
299 int seq_client_alloc_fid(const struct lu_env *env,
300                          struct lu_client_seq *seq, struct lu_fid *fid)
301 {
302         cfs_waitlink_t link;
303         int rc;
304         ENTRY;
305
306         LASSERT(seq != NULL);
307         LASSERT(fid != NULL);
308
309         cfs_waitlink_init(&link);
310         cfs_mutex_lock(&seq->lcs_mutex);
311
312         if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
313                 seq->lcs_fid.f_oid = seq->lcs_width;
314
315         while (1) {
316                 seqno_t seqnr;
317
318                 if (!fid_is_zero(&seq->lcs_fid) &&
319                     fid_oid(&seq->lcs_fid) < seq->lcs_width) {
320                         /* Just bump last allocated fid and return to caller. */
321                         seq->lcs_fid.f_oid += 1;
322                         rc = 0;
323                         break;
324                 }
325
326                 rc = seq_fid_alloc_prep(seq, &link);
327                 if (rc)
328                         continue;
329
330                 rc = seq_client_alloc_seq(env, seq, &seqnr);
331                 if (rc) {
332                         CERROR("%s: Can't allocate new sequence, "
333                                "rc %d\n", seq->lcs_name, rc);
334                         seq_fid_alloc_fini(seq);
335                         cfs_mutex_unlock(&seq->lcs_mutex);
336                         RETURN(rc);
337                 }
338
339                 CDEBUG(D_INFO, "%s: Switch to sequence "
340                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
341
342                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
343                 seq->lcs_fid.f_seq = seqnr;
344                 seq->lcs_fid.f_ver = 0;
345
346                 /*
347                  * Inform caller that sequence switch is performed to allow it
348                  * to setup FLD for it.
349                  */
350                 rc = 1;
351
352                 seq_fid_alloc_fini(seq);
353                 break;
354         }
355
356         *fid = seq->lcs_fid;
357         cfs_mutex_unlock(&seq->lcs_mutex);
358
359         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
360         RETURN(rc);
361 }
362 EXPORT_SYMBOL(seq_client_alloc_fid);
363
364 /*
365  * Finish the current sequence due to disconnect.
366  * See mdc_import_event()
367  */
368 void seq_client_flush(struct lu_client_seq *seq)
369 {
370         cfs_waitlink_t link;
371
372         LASSERT(seq != NULL);
373         cfs_waitlink_init(&link);
374         cfs_mutex_lock(&seq->lcs_mutex);
375
376         while (seq->lcs_update) {
377                 cfs_waitq_add(&seq->lcs_waitq, &link);
378                 cfs_set_current_state(CFS_TASK_UNINT);
379                 cfs_mutex_unlock(&seq->lcs_mutex);
380
381                 cfs_waitq_wait(&link, CFS_TASK_UNINT);
382
383                 cfs_mutex_lock(&seq->lcs_mutex);
384                 cfs_waitq_del(&seq->lcs_waitq, &link);
385                 cfs_set_current_state(CFS_TASK_RUNNING);
386         }
387
388         fid_zero(&seq->lcs_fid);
389         /**
390          * this id shld not be used for seq range allocation.
391          * set to -1 for dgb check.
392          */
393
394         seq->lcs_space.lsr_index = -1;
395
396         range_init(&seq->lcs_space);
397         cfs_mutex_unlock(&seq->lcs_mutex);
398 }
399 EXPORT_SYMBOL(seq_client_flush);
400
401 static void seq_client_proc_fini(struct lu_client_seq *seq);
402
403 #ifdef LPROCFS
404 static int seq_client_proc_init(struct lu_client_seq *seq)
405 {
406         int rc;
407         ENTRY;
408
409         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
410                                              seq_type_proc_dir,
411                                              NULL, NULL);
412
413         if (IS_ERR(seq->lcs_proc_dir)) {
414                 CERROR("%s: LProcFS failed in seq-init\n",
415                        seq->lcs_name);
416                 rc = PTR_ERR(seq->lcs_proc_dir);
417                 RETURN(rc);
418         }
419
420         rc = lprocfs_add_vars(seq->lcs_proc_dir,
421                               seq_client_proc_list, seq);
422         if (rc) {
423                 CERROR("%s: Can't init sequence manager "
424                        "proc, rc %d\n", seq->lcs_name, rc);
425                 GOTO(out_cleanup, rc);
426         }
427
428         RETURN(0);
429
430 out_cleanup:
431         seq_client_proc_fini(seq);
432         return rc;
433 }
434
435 static void seq_client_proc_fini(struct lu_client_seq *seq)
436 {
437         ENTRY;
438         if (seq->lcs_proc_dir) {
439                 if (!IS_ERR(seq->lcs_proc_dir))
440                         lprocfs_remove(&seq->lcs_proc_dir);
441                 seq->lcs_proc_dir = NULL;
442         }
443         EXIT;
444 }
445 #else
446 static int seq_client_proc_init(struct lu_client_seq *seq)
447 {
448         return 0;
449 }
450
451 static void seq_client_proc_fini(struct lu_client_seq *seq)
452 {
453         return;
454 }
455 #endif
456
457 int seq_client_init(struct lu_client_seq *seq,
458                     struct obd_export *exp,
459                     enum lu_cli_type type,
460                     const char *prefix,
461                     struct lu_server_seq *srv)
462 {
463         int rc;
464         ENTRY;
465
466         LASSERT(seq != NULL);
467         LASSERT(prefix != NULL);
468
469         seq->lcs_exp = exp;
470         seq->lcs_srv = srv;
471         seq->lcs_type = type;
472         cfs_mutex_init(&seq->lcs_mutex);
473         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
474         cfs_waitq_init(&seq->lcs_waitq);
475
476         /* Make sure that things are clear before work is started. */
477         seq_client_flush(seq);
478
479         if (exp == NULL) {
480                 LASSERT(seq->lcs_srv != NULL);
481         } else {
482                 LASSERT(seq->lcs_exp != NULL);
483                 seq->lcs_exp = class_export_get(seq->lcs_exp);
484         }
485
486         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
487                  "cli-%s", prefix);
488
489         rc = seq_client_proc_init(seq);
490         if (rc)
491                 seq_client_fini(seq);
492         RETURN(rc);
493 }
494 EXPORT_SYMBOL(seq_client_init);
495
496 void seq_client_fini(struct lu_client_seq *seq)
497 {
498         ENTRY;
499
500         seq_client_proc_fini(seq);
501
502         if (seq->lcs_exp != NULL) {
503                 class_export_put(seq->lcs_exp);
504                 seq->lcs_exp = NULL;
505         }
506
507         seq->lcs_srv = NULL;
508         EXIT;
509 }
510 EXPORT_SYMBOL(seq_client_fini);