Whamcloud - gitweb
LU-6635 lfsck: block replacing the OST-object for test
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/fid/fid_request.c
33  *
34  * Lustre Sequence Manager
35  *
36  * Author: Yury Umanets <umka@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_FID
40
41 #include <linux/module.h>
42 #include <libcfs/libcfs.h>
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <obd_support.h>
46 #include <lustre_fid.h>
47 /* mdc RPC locks */
48 #include <lustre_mdc.h>
49 #include "fid_internal.h"
50
51 static int seq_client_rpc(struct lu_client_seq *seq,
52                           struct lu_seq_range *output, __u32 opc,
53                           const char *opcname)
54 {
55         struct obd_export     *exp = seq->lcs_exp;
56         struct ptlrpc_request *req;
57         struct lu_seq_range   *out, *in;
58         __u32                 *op;
59         unsigned int           debug_mask;
60         int                    rc;
61         ENTRY;
62
63         LASSERT(exp != NULL && !IS_ERR(exp));
64         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
65                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
66         if (req == NULL)
67                 RETURN(-ENOMEM);
68
69         /* Init operation code */
70         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
71         *op = opc;
72
73         /* Zero out input range, this is not recovery yet. */
74         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
75         lu_seq_range_init(in);
76
77         ptlrpc_request_set_replen(req);
78
79         in->lsr_index = seq->lcs_space.lsr_index;
80         if (seq->lcs_type == LUSTRE_SEQ_METADATA)
81                 fld_range_set_mdt(in);
82         else
83                 fld_range_set_ost(in);
84
85         if (opc == SEQ_ALLOC_SUPER) {
86                 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
87                 req->rq_reply_portal = MDC_REPLY_PORTAL;
88                 /* During allocating super sequence for data object,
89                  * the current thread might hold the export of MDT0(MDT0
90                  * precreating objects on this OST), and it will send the
91                  * request to MDT0 here, so we can not keep resending the
92                  * request here, otherwise if MDT0 is failed(umounted),
93                  * it can not release the export of MDT0 */
94                 if (seq->lcs_type == LUSTRE_SEQ_DATA)
95                         req->rq_no_delay = req->rq_no_resend = 1;
96                 debug_mask = D_CONSOLE;
97         } else {
98                 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
99                         req->rq_reply_portal = MDC_REPLY_PORTAL;
100                         req->rq_request_portal = SEQ_METADATA_PORTAL;
101                 } else {
102                         req->rq_reply_portal = OSC_REPLY_PORTAL;
103                         req->rq_request_portal = SEQ_DATA_PORTAL;
104                 }
105
106                 debug_mask = D_INFO;
107         }
108
109         /* Allow seq client RPC during recovery time. */
110         req->rq_allow_replay = 1;
111
112         ptlrpc_at_set_req_timeout(req);
113
114         rc = ptlrpc_queue_wait(req);
115
116         if (rc)
117                 GOTO(out_req, rc);
118
119         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
120         *output = *out;
121
122         if (!lu_seq_range_is_sane(output)) {
123                 CERROR("%s: Invalid range received from server: "
124                        DRANGE"\n", seq->lcs_name, PRANGE(output));
125                 GOTO(out_req, rc = -EINVAL);
126         }
127
128         if (lu_seq_range_is_exhausted(output)) {
129                 CERROR("%s: Range received from server is exhausted: "
130                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
131                 GOTO(out_req, rc = -EINVAL);
132         }
133
134         CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n",
135                      seq->lcs_name, opcname, PRANGE(output));
136
137         EXIT;
138 out_req:
139         ptlrpc_req_finished(req);
140         return rc;
141 }
142
143 /* Request sequence-controller node to allocate new super-sequence. */
144 int seq_client_alloc_super(struct lu_client_seq *seq,
145                            const struct lu_env *env)
146 {
147         int rc;
148         ENTRY;
149
150         mutex_lock(&seq->lcs_mutex);
151
152         if (seq->lcs_srv) {
153 #ifdef HAVE_SEQ_SERVER
154                 LASSERT(env != NULL);
155                 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
156                                             env);
157 #else
158                 rc = 0;
159 #endif
160         } else {
161                 /* Check whether the connection to seq controller has been
162                  * setup (lcs_exp != NULL) */
163                 if (seq->lcs_exp == NULL) {
164                         mutex_unlock(&seq->lcs_mutex);
165                         RETURN(-EINPROGRESS);
166                 }
167
168                 rc = seq_client_rpc(seq, &seq->lcs_space,
169                                     SEQ_ALLOC_SUPER, "super");
170         }
171         mutex_unlock(&seq->lcs_mutex);
172         RETURN(rc);
173 }
174
175 /* Request sequence-controller node to allocate new meta-sequence. */
176 static int seq_client_alloc_meta(const struct lu_env *env,
177                                  struct lu_client_seq *seq)
178 {
179         int rc;
180         ENTRY;
181
182         if (seq->lcs_srv) {
183 #ifdef HAVE_SEQ_SERVER
184                 LASSERT(env != NULL);
185                 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
186 #else
187                 rc = 0;
188 #endif
189         } else {
190                 do {
191                         /* If meta server return -EINPROGRESS or EAGAIN,
192                          * it means meta server might not be ready to
193                          * allocate super sequence from sequence controller
194                          * (MDT0)yet */
195                         rc = seq_client_rpc(seq, &seq->lcs_space,
196                                             SEQ_ALLOC_META, "meta");
197                 } while (rc == -EINPROGRESS || rc == -EAGAIN);
198         }
199
200         RETURN(rc);
201 }
202
203 /* Allocate new sequence for client. */
204 static int seq_client_alloc_seq(const struct lu_env *env,
205                                 struct lu_client_seq *seq, u64 *seqnr)
206 {
207         int rc;
208         ENTRY;
209
210         LASSERT(lu_seq_range_is_sane(&seq->lcs_space));
211
212         if (lu_seq_range_is_exhausted(&seq->lcs_space)) {
213                 rc = seq_client_alloc_meta(env, seq);
214                 if (rc) {
215                         CERROR("%s: Can't allocate new meta-sequence,"
216                                "rc %d\n", seq->lcs_name, rc);
217                         RETURN(rc);
218                 } else {
219                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
220                                seq->lcs_name, PRANGE(&seq->lcs_space));
221                 }
222         } else {
223                 rc = 0;
224         }
225
226         LASSERT(!lu_seq_range_is_exhausted(&seq->lcs_space));
227         *seqnr = seq->lcs_space.lsr_start;
228         seq->lcs_space.lsr_start += 1;
229
230         CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name,
231                *seqnr);
232
233         RETURN(rc);
234 }
235
236 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
237                               wait_queue_t *link)
238 {
239         if (seq->lcs_update) {
240                 add_wait_queue(&seq->lcs_waitq, link);
241                 set_current_state(TASK_UNINTERRUPTIBLE);
242                 mutex_unlock(&seq->lcs_mutex);
243
244                 schedule();
245
246                 mutex_lock(&seq->lcs_mutex);
247                 remove_wait_queue(&seq->lcs_waitq, link);
248                 set_current_state(TASK_RUNNING);
249                 return -EAGAIN;
250         }
251
252         ++seq->lcs_update;
253         mutex_unlock(&seq->lcs_mutex);
254
255         return 0;
256 }
257
258 static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
259                                bool whole)
260 {
261         LASSERT(seq->lcs_update == 1);
262
263         mutex_lock(&seq->lcs_mutex);
264         if (seqnr != 0) {
265                 CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
266                        seq->lcs_name, seqnr);
267
268                 seq->lcs_fid.f_seq = seqnr;
269                 if (whole) {
270                         /* Since the caller require the whole seq,
271                          * so marked this seq to be used */
272                         if (seq->lcs_type == LUSTRE_SEQ_METADATA)
273                                 seq->lcs_fid.f_oid =
274                                         LUSTRE_METADATA_SEQ_MAX_WIDTH;
275                         else
276                                 seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
277                 } else {
278                         seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
279                 }
280                 seq->lcs_fid.f_ver = 0;
281         }
282
283         --seq->lcs_update;
284         wake_up_all(&seq->lcs_waitq);
285 }
286
287 /**
288  * Allocate the whole non-used seq to the caller.
289  *
290  * \param[in] env       pointer to the thread context
291  * \param[in,out] seq   pointer to the client sequence manager
292  * \param[out] seqnr    to hold the new allocated sequence
293  *
294  * \retval              0 for new sequence allocated.
295  * \retval              Negative error number on failure.
296  */
297 int seq_client_get_seq(const struct lu_env *env,
298                        struct lu_client_seq *seq, u64 *seqnr)
299 {
300         wait_queue_t link;
301         int rc;
302
303         LASSERT(seqnr != NULL);
304
305         mutex_lock(&seq->lcs_mutex);
306         init_waitqueue_entry(&link, current);
307
308         /* To guarantee that we can get a whole non-used sequence. */
309         while (seq_fid_alloc_prep(seq, &link) != 0);
310
311         rc = seq_client_alloc_seq(env, seq, seqnr);
312         seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true);
313         if (rc)
314                 CERROR("%s: Can't allocate new sequence: rc = %d\n",
315                        seq->lcs_name, rc);
316         mutex_unlock(&seq->lcs_mutex);
317
318         return rc;
319 }
320 EXPORT_SYMBOL(seq_client_get_seq);
321
322 /**
323  * Allocate new fid on passed client @seq and save it to @fid.
324  *
325  * \param[in] env       pointer to the thread context
326  * \param[in,out] seq   pointer to the client sequence manager
327  * \param[out] fid      to hold the new allocated fid
328  *
329  * \retval              1 for notify the caller that sequence switch
330  *                      is performed to allow it to setup FLD for it.
331  * \retval              0 for new FID allocated in current sequence.
332  * \retval              Negative error number on failure.
333  */
334 int seq_client_alloc_fid(const struct lu_env *env,
335                          struct lu_client_seq *seq, struct lu_fid *fid)
336 {
337         wait_queue_t link;
338         int rc;
339         ENTRY;
340
341         LASSERT(seq != NULL);
342         LASSERT(fid != NULL);
343
344         init_waitqueue_entry(&link, current);
345         mutex_lock(&seq->lcs_mutex);
346
347         if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
348                 seq->lcs_fid.f_oid = seq->lcs_width;
349
350         while (1) {
351                 u64 seqnr;
352
353                 if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
354                              fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
355                         /* Just bump last allocated fid and return to caller. */
356                         seq->lcs_fid.f_oid++;
357                         rc = 0;
358                         break;
359                 }
360
361                 /* Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid
362                  * deadlock during seq_client_alloc_seq(). */
363                 rc = seq_fid_alloc_prep(seq, &link);
364                 if (rc)
365                         continue;
366
367                 rc = seq_client_alloc_seq(env, seq, &seqnr);
368                 /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */
369                 seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false);
370                 if (rc) {
371                         CERROR("%s: Can't allocate new sequence: rc = %d\n",
372                                seq->lcs_name, rc);
373                         mutex_unlock(&seq->lcs_mutex);
374
375                         RETURN(rc);
376                 }
377
378                 rc = 1;
379                 break;
380         }
381
382         *fid = seq->lcs_fid;
383         mutex_unlock(&seq->lcs_mutex);
384
385         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
386
387         RETURN(rc);
388 }
389 EXPORT_SYMBOL(seq_client_alloc_fid);
390
391 /*
392  * Finish the current sequence due to disconnect.
393  * See mdc_import_event()
394  */
395 void seq_client_flush(struct lu_client_seq *seq)
396 {
397         wait_queue_t link;
398
399         LASSERT(seq != NULL);
400         init_waitqueue_entry(&link, current);
401         mutex_lock(&seq->lcs_mutex);
402
403         while (seq->lcs_update) {
404                 add_wait_queue(&seq->lcs_waitq, &link);
405                 set_current_state(TASK_UNINTERRUPTIBLE);
406                 mutex_unlock(&seq->lcs_mutex);
407
408                 schedule();
409
410                 mutex_lock(&seq->lcs_mutex);
411                 remove_wait_queue(&seq->lcs_waitq, &link);
412                 set_current_state(TASK_RUNNING);
413         }
414
415         fid_zero(&seq->lcs_fid);
416         /**
417          * this id shld not be used for seq range allocation.
418          * set to -1 for dgb check.
419          */
420
421         seq->lcs_space.lsr_index = -1;
422
423         lu_seq_range_init(&seq->lcs_space);
424         mutex_unlock(&seq->lcs_mutex);
425 }
426 EXPORT_SYMBOL(seq_client_flush);
427
428 static void seq_client_proc_fini(struct lu_client_seq *seq)
429 {
430 #ifdef CONFIG_PROC_FS
431         ENTRY;
432         if (seq->lcs_proc_dir) {
433                 if (!IS_ERR(seq->lcs_proc_dir))
434                         lprocfs_remove(&seq->lcs_proc_dir);
435                 seq->lcs_proc_dir = NULL;
436         }
437         EXIT;
438 #endif /* CONFIG_PROC_FS */
439 }
440
441 static int seq_client_proc_init(struct lu_client_seq *seq)
442 {
443 #ifdef CONFIG_PROC_FS
444         int rc;
445         ENTRY;
446
447         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, seq_type_proc_dir,
448                                              NULL, NULL);
449         if (IS_ERR(seq->lcs_proc_dir)) {
450                 CERROR("%s: LProcFS failed in seq-init\n",
451                        seq->lcs_name);
452                 rc = PTR_ERR(seq->lcs_proc_dir);
453                 RETURN(rc);
454         }
455
456         rc = lprocfs_add_vars(seq->lcs_proc_dir, seq_client_proc_list, seq);
457         if (rc) {
458                 CERROR("%s: Can't init sequence manager "
459                        "proc, rc %d\n", seq->lcs_name, rc);
460                 GOTO(out_cleanup, rc);
461         }
462
463         RETURN(0);
464
465 out_cleanup:
466         seq_client_proc_fini(seq);
467         return rc;
468
469 #else /* !CONFIG_PROC_FS */
470         return 0;
471 #endif /* CONFIG_PROC_FS */
472 }
473
474 int seq_client_init(struct lu_client_seq *seq,
475                     struct obd_export *exp,
476                     enum lu_cli_type type,
477                     const char *prefix,
478                     struct lu_server_seq *srv)
479 {
480         int rc;
481         ENTRY;
482
483         LASSERT(seq != NULL);
484         LASSERT(prefix != NULL);
485
486         seq->lcs_srv = srv;
487         seq->lcs_type = type;
488
489         mutex_init(&seq->lcs_mutex);
490         if (type == LUSTRE_SEQ_METADATA)
491                 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
492         else
493                 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
494
495         init_waitqueue_head(&seq->lcs_waitq);
496         /* Make sure that things are clear before work is started. */
497         seq_client_flush(seq);
498
499         if (exp != NULL)
500                 seq->lcs_exp = class_export_get(exp);
501
502         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
503                  "cli-%s", prefix);
504
505         rc = seq_client_proc_init(seq);
506         if (rc)
507                 seq_client_fini(seq);
508         RETURN(rc);
509 }
510 EXPORT_SYMBOL(seq_client_init);
511
512 void seq_client_fini(struct lu_client_seq *seq)
513 {
514         ENTRY;
515
516         seq_client_proc_fini(seq);
517
518         if (seq->lcs_exp != NULL) {
519                 class_export_put(seq->lcs_exp);
520                 seq->lcs_exp = NULL;
521         }
522
523         seq->lcs_srv = NULL;
524         EXIT;
525 }
526 EXPORT_SYMBOL(seq_client_fini);
527
528 int client_fid_init(struct obd_device *obd,
529                     struct obd_export *exp, enum lu_cli_type type)
530 {
531         struct client_obd *cli = &obd->u.cli;
532         char *prefix;
533         int rc;
534         ENTRY;
535
536         OBD_ALLOC_PTR(cli->cl_seq);
537         if (cli->cl_seq == NULL)
538                 RETURN(-ENOMEM);
539
540         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
541         if (prefix == NULL)
542                 GOTO(out_free_seq, rc = -ENOMEM);
543
544         snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name);
545
546         /* Init client side sequence-manager */
547         rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
548         OBD_FREE(prefix, MAX_OBD_NAME + 5);
549         if (rc)
550                 GOTO(out_free_seq, rc);
551
552         RETURN(rc);
553 out_free_seq:
554         OBD_FREE_PTR(cli->cl_seq);
555         cli->cl_seq = NULL;
556         return rc;
557 }
558 EXPORT_SYMBOL(client_fid_init);
559
560 int client_fid_fini(struct obd_device *obd)
561 {
562         struct client_obd *cli = &obd->u.cli;
563         ENTRY;
564
565         if (cli->cl_seq != NULL) {
566                 seq_client_fini(cli->cl_seq);
567                 OBD_FREE_PTR(cli->cl_seq);
568                 cli->cl_seq = NULL;
569         }
570
571         RETURN(0);
572 }
573 EXPORT_SYMBOL(client_fid_fini);
574
575 struct proc_dir_entry *seq_type_proc_dir;
576
577 static int __init fid_init(void)
578 {
579         seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
580                                              proc_lustre_root,
581                                              NULL, NULL);
582         if (IS_ERR(seq_type_proc_dir))
583                 return PTR_ERR(seq_type_proc_dir);
584
585 # ifdef HAVE_SERVER_SUPPORT
586         fid_server_mod_init();
587 # endif
588
589         return 0;
590 }
591
592 static void __exit fid_exit(void)
593 {
594 # ifdef HAVE_SERVER_SUPPORT
595         fid_server_mod_exit();
596 # endif
597
598         if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
599                 lprocfs_remove(&seq_type_proc_dir);
600                 seq_type_proc_dir = NULL;
601         }
602 }
603
604 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
605 MODULE_DESCRIPTION("Lustre File IDentifier");
606 MODULE_VERSION(LUSTRE_VERSION_STRING);
607 MODULE_LICENSE("GPL");
608
609 module_init(fid_init);
610 module_exit(fid_exit);