Whamcloud - gitweb
7921f0e1f4e8516fc969058d079f179a5403ccd0
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <lustre_dlm.h>
34 #include <obd_class.h>
35 #include <lustre_swab.h>
36
37 #include "qsd_internal.h"
38
39 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
40                            struct ldlm_lock_desc *desc, void *data,
41                            int flag);
42 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
43
44 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
45 static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
46
47 struct ldlm_enqueue_info qsd_glb_einfo = {
48         .ei_type        = LDLM_PLAIN,
49         .ei_mode        = LCK_CR,
50         .ei_cb_bl       = qsd_glb_blocking_ast,
51         .ei_cb_cp       = ldlm_completion_ast,
52         .ei_cb_gl       = qsd_glb_glimpse_ast,
53 };
54
55 struct ldlm_enqueue_info qsd_id_einfo = {
56         .ei_type        = LDLM_PLAIN,
57         .ei_mode        = LCK_CR,
58         .ei_cb_bl       = qsd_id_blocking_ast,
59         .ei_cb_cp       = ldlm_completion_ast,
60         .ei_cb_gl       = qsd_id_glimpse_ast,
61 };
62
63 /*
64  * Return qsd_qtype_info structure associated with a global lock
65  *
66  * \param lock - is the global lock from which we should extract the qqi
67  * \param reset - whether lock->l_ast_data should be cleared
68  */
69 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
70                                                    bool reset)
71 {
72         struct qsd_qtype_info *qqi;
73
74         ENTRY;
75
76         lock_res_and_lock(lock);
77         qqi = lock->l_ast_data;
78         if (qqi) {
79                 qqi_getref(qqi);
80                 if (reset)
81                         lock->l_ast_data = NULL;
82         }
83         unlock_res_and_lock(lock);
84
85         if (qqi)
86                 /* it is not safe to call lu_ref_add() under spinlock */
87                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
88
89         if (reset && qqi) {
90                 /* release qqi reference hold for the lock */
91                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
92                 qqi_putref(qqi);
93         }
94         RETURN(qqi);
95 }
96
97 /*
98  * Return lquota entry structure associated with a per-ID lock
99  *
100  * \param lock - is the per-ID lock from which we should extract the lquota
101  *               entry
102  * \param reset - whether lock->l_ast_data should be cleared
103  */
104 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
105                                                 bool reset)
106 {
107         struct lquota_entry *lqe;
108
109         ENTRY;
110
111         lock_res_and_lock(lock);
112         lqe = lock->l_ast_data;
113         if (lqe) {
114                 lqe_getref(lqe);
115                 if (reset)
116                         lock->l_ast_data = NULL;
117         }
118         unlock_res_and_lock(lock);
119
120         if (reset && lqe)
121                 /* release lqe reference hold for the lock */
122                 lqe_putref(lqe);
123         RETURN(lqe);
124 }
125
126 /*
127  * Glimpse callback handler for all quota locks. This function extracts
128  * information from the glimpse request.
129  *
130  * \param lock - is the lock targeted by the glimpse
131  * \param data - is a pointer to the glimpse ptlrpc request
132  * \param req  - is the glimpse request
133  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
134  *               request.
135  * \param lvb  - is the pointer to the lvb in the reply buffer
136  *
137  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
138  *         appropriate error on failure
139  */
140 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
141                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
142 {
143         int rc;
144
145         ENTRY;
146
147         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
148
149         /* glimpse on quota locks always packs a glimpse descriptor */
150         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
151
152         /* extract glimpse descriptor */
153         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
154         if (!*desc)
155                 RETURN(-EFAULT);
156
157         if (ptlrpc_req_need_swab(req))
158                 lustre_swab_gl_lquota_desc(*desc);
159
160         /* prepare reply */
161         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
162                              sizeof(struct lquota_lvb));
163         rc = req_capsule_server_pack(&req->rq_pill);
164         if (rc != 0) {
165                 CERROR("Can't pack response, rc %d\n", rc);
166                 RETURN(rc);
167         }
168
169         /* extract lvb */
170         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
171
172         RETURN(0);
173 }
174
175 /*
176  * Blocking callback handler for global index lock
177  *
178  * \param lock - is the lock for which ast occurred.
179  * \param desc - is the description of a conflicting lock in case of blocking
180  *               ast.
181  * \param data - is the value of lock->l_ast_data
182  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
183  *               cancellation and blocking ast's.
184  */
185 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
186                                 struct ldlm_lock_desc *desc, void *data,
187                                 int flag)
188 {
189         int rc = 0;
190
191         ENTRY;
192
193         switch (flag) {
194         case LDLM_CB_BLOCKING: {
195                 struct lustre_handle lockh;
196
197                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
198                 ldlm_lock2handle(lock, &lockh);
199                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
200                 break;
201         }
202         case LDLM_CB_CANCELING: {
203                 struct qsd_qtype_info *qqi;
204
205                 LDLM_DEBUG(lock, "canceling global quota lock");
206
207                 qqi = qsd_glb_ast_data_get(lock, true);
208                 if (!qqi)
209                         break;
210
211                 /*
212                  * we are losing the global index lock, so let's mark the
213                  * global & slave indexes as not up-to-date any more
214                  */
215                 write_lock(&qqi->qqi_qsd->qsd_lock);
216                 qqi->qqi_glb_uptodate = false;
217                 qqi->qqi_slv_uptodate = false;
218                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
219                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
220                 write_unlock(&qqi->qqi_qsd->qsd_lock);
221
222                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
223                        qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype)));
224
225                 /*
226                  * kick off reintegration thread if not running already, if
227                  * it's just local cancel (for stack clean up or eviction),
228                  * don't re-trigger the reintegration.
229                  */
230                 if (!ldlm_is_local_only(lock))
231                         qsd_start_reint_thread(qqi);
232
233                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
234                 qqi_putref(qqi);
235                 break;
236         }
237         default:
238                 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
239         }
240
241         RETURN(rc);
242 }
243
244 static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
245                                  struct hlist_node *hnode, void *data)
246 {
247         struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)data;
248         struct lquota_entry *lqe;
249
250         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
251         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
252
253         if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default)
254                 return 0;
255
256         lqe_write_lock(lqe);
257         if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0)
258                 lqe->lqe_enforced = false;
259         else
260                 lqe->lqe_enforced = true;
261         lqe_write_unlock(lqe);
262
263         return 0;
264 }
265
266 /* Update the quota entries after receiving default quota update
267  *
268  * \param qqi       - is the qsd_qtype_info associated with the quota entries
269  * \param hardlimit - new hardlimit of default quota
270  * \param softlimit - new softlimit of default quota
271  * \param gracetime - new gracetime of default quota
272  */
273 void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit,
274                               __u64 softlimit, __u64 gracetime)
275 {
276         CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n",
277                qqi->qqi_qsd->qsd_svname);
278
279         qqi->qqi_default_hardlimit = hardlimit;
280         qqi->qqi_default_softlimit = softlimit;
281         qqi->qqi_default_gracetime = gracetime;
282
283         cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash,
284                                qsd_entry_def_iter_cb, qqi);
285 }
286
287 /*
288  * Glimpse callback handler for global quota lock.
289  *
290  * \param lock - is the lock targeted by the glimpse
291  * \param data - is a pointer to the glimpse ptlrpc request
292  */
293 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
294 {
295         struct ptlrpc_request *req = data;
296         struct qsd_qtype_info *qqi;
297         struct ldlm_gl_lquota_desc *desc;
298         struct lquota_lvb *lvb;
299         struct lquota_glb_rec rec;
300         int rc;
301
302         ENTRY;
303
304         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
305         if (rc)
306                 GOTO(out, rc);
307
308         qqi = qsd_glb_ast_data_get(lock, false);
309         if (!qqi)
310                 /* valid race */
311                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
312
313         CDEBUG(D_QUOTA,
314                "%s: glimpse on glb quota locks, id:%llu ver:%llu hard:%llu soft:%llu\n",
315                qqi->qqi_qsd->qsd_svname,
316                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
317                desc->gl_softlimit);
318
319         if (desc->gl_ver == 0) {
320                 CERROR("%s: invalid global index version %llu\n",
321                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
322                 GOTO(out_qqi, rc = -EINVAL);
323         }
324
325         /* extract new hard & soft limits from the glimpse descriptor */
326         rec.qbr_hardlimit = desc->gl_hardlimit;
327         rec.qbr_softlimit = desc->gl_softlimit;
328         rec.qbr_time      = desc->gl_time;
329         rec.qbr_granted   = 0;
330
331         if (desc->gl_id.qid_uid == 0)
332                 qsd_update_default_quota(qqi, desc->gl_hardlimit,
333                                          desc->gl_softlimit, desc->gl_time);
334
335         /*
336          * We can't afford disk io in the context of glimpse callback handling
337          * thread, so the on-disk global limits update has to be deferred.
338          */
339         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
340                          desc->gl_ver, true);
341         EXIT;
342 out_qqi:
343         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
344         qqi_putref(qqi);
345 out:
346         req->rq_status = rc;
347         return rc;
348 }
349
350 /**
351  * Blocking callback handler for per-ID lock
352  *
353  * \param lock - is the lock for which ast occurred.
354  * \param desc - is the description of a conflicting lock in case of blocking
355  *               ast.
356  * \param data - is the value of lock->l_ast_data
357  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
358  *               cancellation and blocking ast's.
359  */
360 static int qsd_id_blocking_ast(struct ldlm_lock *lock,
361                                struct ldlm_lock_desc *desc,
362                                void *data, int flag)
363 {
364         struct lustre_handle lockh;
365         int rc = 0;
366
367         ENTRY;
368
369         switch (flag) {
370         case LDLM_CB_BLOCKING: {
371
372                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
373                 ldlm_lock2handle(lock, &lockh);
374                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
375                 break;
376         }
377         case LDLM_CB_CANCELING: {
378                 struct lu_env *env;
379                 struct lquota_entry *lqe;
380
381                 LDLM_DEBUG(lock, "canceling ID quota lock");
382                 lqe = qsd_id_ast_data_get(lock, true);
383                 if (!lqe)
384                         break;
385
386                 LQUOTA_DEBUG(lqe, "losing ID lock");
387
388                 ldlm_lock2handle(lock, &lockh);
389                 lqe_write_lock(lqe);
390                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
391                         /* Clear lqe_lockh & reset qunit to 0 */
392                         qsd_set_qunit(lqe, 0);
393                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
394                         qsd_set_edquot(lqe, false);
395                 }
396                 lqe_write_unlock(lqe);
397
398                 /*
399                  * If there is dqacq inflight, the release will be skipped
400                  * at this time, and triggered on dqacq completion later,
401                  * which means there could be a short window that slave is
402                  * holding spare grant wihtout per-ID lock.
403                  */
404
405                 /*
406                  * don't release quota space for local cancel (stack clean
407                  * up or eviction)
408                  */
409                 if (!ldlm_is_local_only(lock)) {
410                         /* allocate environment */
411                         OBD_ALLOC_PTR(env);
412                         if (!env) {
413                                 lqe_putref(lqe);
414                                 rc = -ENOMEM;
415                                 break;
416                         }
417
418                         /* initialize environment */
419                         rc = lu_env_init(env, LCT_DT_THREAD);
420                         if (rc) {
421                                 OBD_FREE_PTR(env);
422                                 lqe_putref(lqe);
423                                 break;
424                         }
425
426                         rc = qsd_adjust(env, lqe);
427
428                         lu_env_fini(env);
429                         OBD_FREE_PTR(env);
430                 }
431
432                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
433                 lqe_putref(lqe);
434                 break;
435         }
436         default:
437                 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
438         }
439
440         RETURN(rc);
441 }
442
443 /*
444  * Glimpse callback handler for per-ID quota locks.
445  *
446  * \param lock - is the lock targeted by the glimpse
447  * \param data - is a pointer to the glimpse ptlrpc request
448  */
449 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
450 {
451         struct ptlrpc_request *req = data;
452         struct lquota_entry *lqe;
453         struct ldlm_gl_lquota_desc *desc;
454         struct lquota_lvb *lvb;
455         int rc;
456         bool wakeup = false;
457
458         ENTRY;
459
460         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
461         if (rc)
462                 GOTO(out, rc);
463
464         lqe = qsd_id_ast_data_get(lock, false);
465         if (!lqe)
466                 /* valid race */
467                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
468
469         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu, edquot:%d",
470                      desc->gl_qunit, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
471
472         lqe_write_lock(lqe);
473         lvb->lvb_id_rel = 0;
474         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
475                 long long space;
476
477                 /* extract new qunit from glimpse request */
478                 qsd_set_qunit(lqe, desc->gl_qunit);
479
480                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
481                 space -= lqe->lqe_usage;
482                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
483                 space -= lqe->lqe_qunit;
484
485                 if (space > 0) {
486                         if (lqe->lqe_pending_req > 0) {
487                                 LQUOTA_DEBUG(lqe,
488                                              "request in flight, postpone release of %lld",
489                                              space);
490                                 lvb->lvb_id_may_rel = space;
491                         } else {
492                                 lqe->lqe_pending_req++;
493
494                                 /* release quota space in glimpse reply */
495                                 LQUOTA_DEBUG(lqe, "releasing %lld", space);
496                                 lqe->lqe_granted -= space;
497                                 lvb->lvb_id_rel   = space;
498
499                                 lqe_write_unlock(lqe);
500                                 /* change the lqe_granted */
501                                 qsd_upd_schedule(lqe2qqi(lqe), lqe,
502                                                  &lqe->lqe_id,
503                                                  (union lquota_rec *)
504                                                   &lqe->lqe_granted, 0, false);
505                                 lqe_write_lock(lqe);
506
507                                 lqe->lqe_pending_req--;
508                                 wakeup = true;
509                         }
510                 }
511         }
512
513         qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
514         lqe_write_unlock(lqe);
515
516         if (wakeup)
517                 wake_up(&lqe->lqe_waiters);
518         lqe_putref(lqe);
519 out:
520         req->rq_status = rc;
521         RETURN(rc);
522 }
523
524 /**
525  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
526  *
527  * \param lockh  - is the local lock handle from lquota entry.
528  * \param rlockh - is the remote lock handle of the matched lock, if any.
529  *
530  * \retval 0      : on successful look up and \lockh contains the lock handle.
531  * \retval -ENOENT: no lock found
532  */
533 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
534 {
535         struct ldlm_lock *lock;
536         int rc;
537
538         ENTRY;
539
540         LASSERT(lockh);
541
542         if (!lustre_handle_is_used(lockh))
543                 RETURN(-ENOENT);
544
545         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
546         if (rc)
547                 RETURN(-ENOENT);
548
549         LASSERT(lustre_handle_is_used(lockh));
550         ldlm_lock_dump_handle(D_QUOTA, lockh);
551
552         if (!rlockh)
553                 /* caller not interested in remote handle */
554                 RETURN(0);
555
556         /*
557          * look up lock associated with local handle and extract remote handle
558          * to be packed in quota request
559          */
560         lock = ldlm_handle2lock(lockh);
561         LASSERT(lock != NULL);
562         lustre_handle_copy(rlockh, &lock->l_remote_handle);
563         LDLM_LOCK_PUT(lock);
564
565         RETURN(0);
566 }
567
568 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
569 {
570         struct qsd_thread_info *qti = qsd_info(env);
571         int rc;
572
573         ENTRY;
574
575         lqe_write_lock(lqe);
576         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
577             lqe->lqe_usage || lqe->lqe_granted) {
578                 lqe_write_unlock(lqe);
579                 RETURN(0);
580         }
581
582         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
583         if (lustre_handle_is_used(&qti->qti_lockh)) {
584                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
585                 qsd_set_qunit(lqe, 0);
586                 qsd_set_edquot(lqe, false);
587         }
588         lqe_write_unlock(lqe);
589
590         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
591         if (rc)
592                 RETURN(rc);
593
594         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
595         RETURN(0);
596 }