Whamcloud - gitweb
LU-6142 lov: Fix style issues for lov_ea.c
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <lustre_dlm.h>
34 #include <obd_class.h>
35 #include <lustre_swab.h>
36
37 #include "qsd_internal.h"
38
39 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
40                            struct ldlm_lock_desc *desc, void *data,
41                            int flag);
42 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
43
44 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
45 static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
46
47 struct ldlm_enqueue_info qsd_glb_einfo = {
48         .ei_type        = LDLM_PLAIN,
49         .ei_mode        = LCK_CR,
50         .ei_cb_bl       = qsd_glb_blocking_ast,
51         .ei_cb_cp       = ldlm_completion_ast,
52         .ei_cb_gl       = qsd_glb_glimpse_ast,
53 };
54
55 struct ldlm_enqueue_info qsd_id_einfo = {
56         .ei_type        = LDLM_PLAIN,
57         .ei_mode        = LCK_CR,
58         .ei_cb_bl       = qsd_id_blocking_ast,
59         .ei_cb_cp       = ldlm_completion_ast,
60         .ei_cb_gl       = qsd_id_glimpse_ast,
61 };
62
63 /*
64  * Return qsd_qtype_info structure associated with a global lock
65  *
66  * \param lock - is the global lock from which we should extract the qqi
67  * \param reset - whether lock->l_ast_data should be cleared
68  */
69 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
70                                                    bool reset) {
71         struct qsd_qtype_info *qqi;
72         ENTRY;
73
74         lock_res_and_lock(lock);
75         qqi = lock->l_ast_data;
76         if (qqi != NULL) {
77                 qqi_getref(qqi);
78                 if (reset)
79                         lock->l_ast_data = NULL;
80         }
81         unlock_res_and_lock(lock);
82
83         if (qqi != NULL)
84                 /* it is not safe to call lu_ref_add() under spinlock */
85                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
86
87         if (reset && qqi != NULL) {
88                 /* release qqi reference hold for the lock */
89                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
90                 qqi_putref(qqi);
91         }
92         RETURN(qqi);
93 }
94
95 /*
96  * Return lquota entry structure associated with a per-ID lock
97  *
98  * \param lock - is the per-ID lock from which we should extract the lquota
99  *               entry
100  * \param reset - whether lock->l_ast_data should be cleared
101  */
102 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
103                                                 bool reset) {
104         struct lquota_entry *lqe;
105         ENTRY;
106
107         lock_res_and_lock(lock);
108         lqe = lock->l_ast_data;
109         if (lqe != NULL) {
110                 lqe_getref(lqe);
111                 if (reset)
112                         lock->l_ast_data = NULL;
113         }
114         unlock_res_and_lock(lock);
115
116         if (reset && lqe != NULL)
117                 /* release lqe reference hold for the lock */
118                 lqe_putref(lqe);
119         RETURN(lqe);
120 }
121
122 /*
123  * Glimpse callback handler for all quota locks. This function extracts
124  * information from the glimpse request.
125  *
126  * \param lock - is the lock targeted by the glimpse
127  * \param data - is a pointer to the glimpse ptlrpc request
128  * \param req  - is the glimpse request
129  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
130  *               request.
131  * \param lvb  - is the pointer to the lvb in the reply buffer
132  *
133  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
134  *         appropriate error on failure
135  */
136 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
137                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
138 {
139         int rc;
140         ENTRY;
141
142         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
143
144         /* glimpse on quota locks always packs a glimpse descriptor */
145         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
146
147         /* extract glimpse descriptor */
148         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
149         if (*desc == NULL)
150                 RETURN(-EFAULT);
151
152         if (ptlrpc_req_need_swab(req))
153                 lustre_swab_gl_lquota_desc(*desc);
154
155         /* prepare reply */
156         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
157                              sizeof(struct lquota_lvb));
158         rc = req_capsule_server_pack(&req->rq_pill);
159         if (rc != 0) {
160                 CERROR("Can't pack response, rc %d\n", rc);
161                 RETURN(rc);
162         }
163
164         /* extract lvb */
165         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
166
167         RETURN(0);
168 }
169
170 /*
171  * Blocking callback handler for global index lock
172  *
173  * \param lock - is the lock for which ast occurred.
174  * \param desc - is the description of a conflicting lock in case of blocking
175  *               ast.
176  * \param data - is the value of lock->l_ast_data
177  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
178  *               cancellation and blocking ast's.
179  */
180 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
181                                 struct ldlm_lock_desc *desc, void *data,
182                                 int flag)
183 {
184         int rc = 0;
185         ENTRY;
186
187         switch(flag) {
188         case LDLM_CB_BLOCKING: {
189                 struct lustre_handle lockh;
190
191                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
192                 ldlm_lock2handle(lock, &lockh);
193                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
194                 break;
195         }
196         case LDLM_CB_CANCELING: {
197                 struct qsd_qtype_info *qqi;
198
199                 LDLM_DEBUG(lock, "canceling global quota lock");
200
201                 qqi = qsd_glb_ast_data_get(lock, true);
202                 if (qqi == NULL)
203                         break;
204
205                 /* we are losing the global index lock, so let's mark the
206                  * global & slave indexes as not up-to-date any more */
207                 write_lock(&qqi->qqi_qsd->qsd_lock);
208                 qqi->qqi_glb_uptodate = false;
209                 qqi->qqi_slv_uptodate = false;
210                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
211                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
212                 write_unlock(&qqi->qqi_qsd->qsd_lock);
213
214                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
215                        qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype)));
216
217                 /* kick off reintegration thread if not running already, if
218                  * it's just local cancel (for stack clean up or eviction),
219                  * don't re-trigger the reintegration. */
220                 if (!ldlm_is_local_only(lock))
221                         qsd_start_reint_thread(qqi);
222
223                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
224                 qqi_putref(qqi);
225                 break;
226         }
227         default:
228                 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
229         }
230
231         RETURN(rc);
232 }
233
234 static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
235                                  struct hlist_node *hnode, void *data)
236 {
237         struct qsd_qtype_info   *qqi = (struct qsd_qtype_info *)data;
238         struct lquota_entry     *lqe;
239
240         lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
241         LASSERT(atomic_read(&lqe->lqe_ref) > 0);
242
243         if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default)
244                 return 0;
245
246         lqe_write_lock(lqe);
247         if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0)
248                 lqe->lqe_enforced = false;
249         else
250                 lqe->lqe_enforced = true;
251         lqe_write_unlock(lqe);
252
253         return 0;
254 }
255
256 /* Update the quota entries after receiving default quota update
257  *
258  * \param qqi       - is the qsd_qtype_info associated with the quota entries
259  * \param hardlimit - new hardlimit of default quota
260  * \param softlimit - new softlimit of default quota
261  * \param gracetime - new gracetime of default quota
262  */
263 void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit,
264                               __u64 softlimit, __u64 gracetime)
265 {
266         CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n",
267                qqi->qqi_qsd->qsd_svname);
268
269         qqi->qqi_default_hardlimit = hardlimit;
270         qqi->qqi_default_softlimit = softlimit;
271         qqi->qqi_default_gracetime = gracetime;
272
273         cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash,
274                                qsd_entry_def_iter_cb, qqi);
275 }
276
277 /*
278  * Glimpse callback handler for global quota lock.
279  *
280  * \param lock - is the lock targeted by the glimpse
281  * \param data - is a pointer to the glimpse ptlrpc request
282  */
283 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
284 {
285         struct ptlrpc_request           *req = data;
286         struct qsd_qtype_info           *qqi;
287         struct ldlm_gl_lquota_desc      *desc;
288         struct lquota_lvb               *lvb;
289         struct lquota_glb_rec            rec;
290         int                              rc;
291         ENTRY;
292
293         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
294         if (rc)
295                 GOTO(out, rc);
296
297         qqi = qsd_glb_ast_data_get(lock, false);
298         if (qqi == NULL)
299                 /* valid race */
300                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
301
302         CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:%llu ver:%llu"
303                " hard:" "%llu soft:%llu\n", qqi->qqi_qsd->qsd_svname,
304                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
305                desc->gl_softlimit);
306
307         if (desc->gl_ver == 0) {
308                 CERROR("%s: invalid global index version %llu\n",
309                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
310                 GOTO(out_qqi, rc = -EINVAL);
311         }
312
313         /* extract new hard & soft limits from the glimpse descriptor */
314         rec.qbr_hardlimit = desc->gl_hardlimit;
315         rec.qbr_softlimit = desc->gl_softlimit;
316         rec.qbr_time      = desc->gl_time;
317         rec.qbr_granted   = 0;
318
319         if (desc->gl_id.qid_uid == 0)
320                 qsd_update_default_quota(qqi, desc->gl_hardlimit,
321                                          desc->gl_softlimit, desc->gl_time);
322
323         /* We can't afford disk io in the context of glimpse callback handling
324          * thread, so the on-disk global limits update has to be deferred. */
325         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
326                          desc->gl_ver, true);
327         EXIT;
328 out_qqi:
329         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
330         qqi_putref(qqi);
331 out:
332         req->rq_status = rc;
333         return rc;
334 }
335
336 /**
337  * Blocking callback handler for per-ID lock
338  *
339  * \param lock - is the lock for which ast occurred.
340  * \param desc - is the description of a conflicting lock in case of blocking
341  *               ast.
342  * \param data - is the value of lock->l_ast_data
343  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
344  *               cancellation and blocking ast's.
345  */
346 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
347                                void *data, int flag)
348 {
349         struct lustre_handle    lockh;
350         int                     rc = 0;
351         ENTRY;
352
353         switch(flag) {
354         case LDLM_CB_BLOCKING: {
355
356                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
357                 ldlm_lock2handle(lock, &lockh);
358                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
359                 break;
360         }
361         case LDLM_CB_CANCELING: {
362                 struct lu_env           *env;
363                 struct lquota_entry     *lqe;
364
365                 LDLM_DEBUG(lock, "canceling ID quota lock");
366                 lqe = qsd_id_ast_data_get(lock, true);
367                 if (lqe == NULL)
368                         break;
369
370                 LQUOTA_DEBUG(lqe, "losing ID lock");
371
372                 ldlm_lock2handle(lock, &lockh);
373                 lqe_write_lock(lqe);
374                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
375                         /* Clear lqe_lockh & reset qunit to 0 */
376                         qsd_set_qunit(lqe, 0);
377                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
378                         qsd_set_edquot(lqe, false);
379                 }
380                 lqe_write_unlock(lqe);
381
382                 /* If there is dqacq inflight, the release will be skipped
383                  * at this time, and triggered on dqacq completion later,
384                  * which means there could be a short window that slave is
385                  * holding spare grant wihtout per-ID lock. */
386
387                 /* don't release quota space for local cancel (stack clean
388                  * up or eviction) */
389                 if (!ldlm_is_local_only(lock)) {
390                         /* allocate environment */
391                         OBD_ALLOC_PTR(env);
392                         if (env == NULL) {
393                                 lqe_putref(lqe);
394                                 rc = -ENOMEM;
395                                 break;
396                         }
397
398                         /* initialize environment */
399                         rc = lu_env_init(env, LCT_DT_THREAD);
400                         if (rc) {
401                                 OBD_FREE_PTR(env);
402                                 lqe_putref(lqe);
403                                 break;
404                         }
405
406                         rc = qsd_adjust(env, lqe);
407
408                         lu_env_fini(env);
409                         OBD_FREE_PTR(env);
410                 }
411
412                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
413                 lqe_putref(lqe);
414                 break;
415         }
416         default:
417                 LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
418         }
419
420         RETURN(rc);
421 }
422
423 /*
424  * Glimpse callback handler for per-ID quota locks.
425  *
426  * \param lock - is the lock targeted by the glimpse
427  * \param data - is a pointer to the glimpse ptlrpc request
428  */
429 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
430 {
431         struct ptlrpc_request           *req = data;
432         struct lquota_entry             *lqe;
433         struct ldlm_gl_lquota_desc      *desc;
434         struct lquota_lvb               *lvb;
435         int                              rc;
436         bool                             wakeup = false;
437         ENTRY;
438
439         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
440         if (rc)
441                 GOTO(out, rc);
442
443         lqe = qsd_id_ast_data_get(lock, false);
444         if (lqe == NULL)
445                 /* valid race */
446                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
447
448         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu",
449                      desc->gl_qunit);
450
451         lqe_write_lock(lqe);
452         lvb->lvb_id_rel = 0;
453         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
454                 long long space;
455
456                 /* extract new qunit from glimpse request */
457                 qsd_set_qunit(lqe, desc->gl_qunit);
458
459                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
460                 space -= lqe->lqe_usage;
461                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
462                 space -= lqe->lqe_qunit;
463
464                 if (space > 0) {
465                         if (lqe->lqe_pending_req > 0) {
466                                 LQUOTA_DEBUG(lqe, "request in flight, postpone "
467                                              "release of %lld", space);
468                                 lvb->lvb_id_may_rel = space;
469                         } else {
470                                 lqe->lqe_pending_req++;
471
472                                 /* release quota space in glimpse reply */
473                                 LQUOTA_DEBUG(lqe, "releasing %lld", space);
474                                 lqe->lqe_granted -= space;
475                                 lvb->lvb_id_rel   = space;
476
477                                 lqe_write_unlock(lqe);
478                                 /* change the lqe_granted */
479                                 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
480                                                  (union lquota_rec *)&lqe->lqe_granted,
481                                                  0, false);
482                                 lqe_write_lock(lqe);
483
484                                 lqe->lqe_pending_req--;
485                                 wakeup = true;
486                         }
487                 }
488         }
489
490         qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
491         lqe_write_unlock(lqe);
492
493         if (wakeup)
494                 wake_up_all(&lqe->lqe_waiters);
495         lqe_putref(lqe);
496 out:
497         req->rq_status = rc;
498         RETURN(rc);
499 }
500
501 /**
502  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
503  *
504  * \param lockh  - is the local lock handle from lquota entry.
505  * \param rlockh - is the remote lock handle of the matched lock, if any.
506  *
507  * \retval 0      : on successful look up and \lockh contains the lock handle.
508  * \retval -ENOENT: no lock found
509  */
510 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
511 {
512         struct ldlm_lock        *lock;
513         int                      rc;
514         ENTRY;
515
516         LASSERT(lockh);
517
518         if (!lustre_handle_is_used(lockh))
519                 RETURN(-ENOENT);
520
521         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
522         if (rc)
523                 RETURN(-ENOENT);
524
525         LASSERT(lustre_handle_is_used(lockh));
526         ldlm_lock_dump_handle(D_QUOTA, lockh);
527
528         if (rlockh == NULL)
529                 /* caller not interested in remote handle */
530                 RETURN(0);
531
532         /* look up lock associated with local handle and extract remote handle
533          * to be packed in quota request */
534         lock = ldlm_handle2lock(lockh);
535         LASSERT(lock != NULL);
536         lustre_handle_copy(rlockh, &lock->l_remote_handle);
537         LDLM_LOCK_PUT(lock);
538
539         RETURN(0);
540 }
541
542 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
543 {
544         struct qsd_thread_info  *qti = qsd_info(env);
545         int                      rc;
546         ENTRY;
547
548         lqe_write_lock(lqe);
549         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
550             lqe->lqe_usage || lqe->lqe_granted) {
551                 lqe_write_unlock(lqe);
552                 RETURN(0);
553         }
554
555         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
556         if (lustre_handle_is_used(&qti->qti_lockh)) {
557                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
558                 qsd_set_qunit(lqe, 0);
559                 qsd_set_edquot(lqe, false);
560         }
561         lqe_write_unlock(lqe);
562
563         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
564         if (rc)
565                 RETURN(rc);
566
567         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
568         RETURN(0);
569 }