Whamcloud - gitweb
1649d63802acd4820ec4f26f520e324f7c9000a6
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qsd_internal.h"
41
42 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
43                            struct ldlm_lock_desc *desc, void *data,
44                            int flag);
45 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
46
47 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
48 static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
49
50 struct ldlm_enqueue_info qsd_glb_einfo = {
51         .ei_type        = LDLM_PLAIN,
52         .ei_mode        = LCK_CR,
53         .ei_cb_bl       = qsd_glb_blocking_ast,
54         .ei_cb_cp       = ldlm_completion_ast,
55         .ei_cb_gl       = qsd_glb_glimpse_ast,
56 };
57
58 struct ldlm_enqueue_info qsd_id_einfo = {
59         .ei_type        = LDLM_PLAIN,
60         .ei_mode        = LCK_CR,
61         .ei_cb_bl       = qsd_id_blocking_ast,
62         .ei_cb_cp       = ldlm_completion_ast,
63         .ei_cb_gl       = qsd_id_glimpse_ast,
64 };
65
66 /*
67  * Return qsd_qtype_info structure associated with a global lock
68  *
69  * \param lock - is the global lock from which we should extract the qqi
70  * \param reset - whether lock->l_ast_data should be cleared
71  */
72 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
73                                                    bool reset) {
74         struct qsd_qtype_info *qqi;
75         ENTRY;
76
77         lock_res_and_lock(lock);
78         qqi = lock->l_ast_data;
79         if (qqi != NULL) {
80                 qqi_getref(qqi);
81                 if (reset)
82                         lock->l_ast_data = NULL;
83         }
84         unlock_res_and_lock(lock);
85
86         if (qqi != NULL)
87                 /* it is not safe to call lu_ref_add() under spinlock */
88                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
89
90         if (reset && qqi != NULL) {
91                 /* release qqi reference hold for the lock */
92                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
93                 qqi_putref(qqi);
94         }
95         RETURN(qqi);
96 }
97
98 /*
99  * Return lquota entry structure associated with a per-ID lock
100  *
101  * \param lock - is the per-ID lock from which we should extract the lquota
102  *               entry
103  * \param reset - whether lock->l_ast_data should be cleared
104  */
105 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
106                                                 bool reset) {
107         struct lquota_entry *lqe;
108         ENTRY;
109
110         lock_res_and_lock(lock);
111         lqe = lock->l_ast_data;
112         if (lqe != NULL) {
113                 lqe_getref(lqe);
114                 if (reset)
115                         lock->l_ast_data = NULL;
116         }
117         unlock_res_and_lock(lock);
118
119         if (reset && lqe != NULL)
120                 /* release lqe reference hold for the lock */
121                 lqe_putref(lqe);
122         RETURN(lqe);
123 }
124
125 /*
126  * Glimpse callback handler for all quota locks. This function extracts
127  * information from the glimpse request.
128  *
129  * \param lock - is the lock targeted by the glimpse
130  * \param data - is a pointer to the glimpse ptlrpc request
131  * \param req  - is the glimpse request
132  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
133  *               request.
134  * \param lvb  - is the pointer to the lvb in the reply buffer
135  *
136  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
137  *         appropriate error on failure
138  */
139 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
140                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
141 {
142         int rc;
143         ENTRY;
144
145         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
146
147         /* glimpse on quota locks always packs a glimpse descriptor */
148         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
149
150         /* extract glimpse descriptor */
151         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
152         if (*desc == NULL)
153                 RETURN(-EFAULT);
154
155         /* prepare reply */
156         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
157                              sizeof(struct lquota_lvb));
158         rc = req_capsule_server_pack(&req->rq_pill);
159         if (rc != 0) {
160                 CERROR("Can't pack response, rc %d\n", rc);
161                 RETURN(rc);
162         }
163
164         /* extract lvb */
165         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
166
167         RETURN(0);
168 }
169
170 /*
171  * Blocking callback handler for global index lock
172  *
173  * \param lock - is the lock for which ast occurred.
174  * \param desc - is the description of a conflicting lock in case of blocking
175  *               ast.
176  * \param data - is the value of lock->l_ast_data
177  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
178  *               cancellation and blocking ast's.
179  */
180 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
181                                 struct ldlm_lock_desc *desc, void *data,
182                                 int flag)
183 {
184         int rc = 0;
185         ENTRY;
186
187         switch(flag) {
188         case LDLM_CB_BLOCKING: {
189                 struct lustre_handle lockh;
190
191                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
192                 ldlm_lock2handle(lock, &lockh);
193                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
194                 break;
195         }
196         case LDLM_CB_CANCELING: {
197                 struct qsd_qtype_info *qqi;
198
199                 LDLM_DEBUG(lock, "canceling global quota lock");
200
201                 qqi = qsd_glb_ast_data_get(lock, true);
202                 if (qqi == NULL)
203                         break;
204
205                 /* we are losing the global index lock, so let's mark the
206                  * global & slave indexes as not up-to-date any more */
207                 write_lock(&qqi->qqi_qsd->qsd_lock);
208                 qqi->qqi_glb_uptodate = false;
209                 qqi->qqi_slv_uptodate = false;
210                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
211                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
212                 write_unlock(&qqi->qqi_qsd->qsd_lock);
213
214                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
215                        qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
216
217                 /* kick off reintegration thread if not running already, if
218                  * it's just local cancel (for stack clean up or eviction),
219                  * don't re-trigger the reintegration. */
220                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
221                         qsd_start_reint_thread(qqi);
222
223                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
224                 qqi_putref(qqi);
225                 break;
226         }
227         default:
228                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
229         }
230
231         RETURN(rc);
232 }
233
234 /*
235  * Glimpse callback handler for global quota lock.
236  *
237  * \param lock - is the lock targeted by the glimpse
238  * \param data - is a pointer to the glimpse ptlrpc request
239  */
240 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
241 {
242         struct ptlrpc_request           *req = data;
243         struct qsd_qtype_info           *qqi;
244         struct ldlm_gl_lquota_desc      *desc;
245         struct lquota_lvb               *lvb;
246         struct lquota_glb_rec            rec;
247         int                              rc;
248         ENTRY;
249
250         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
251         if (rc)
252                 GOTO(out, rc);
253
254         qqi = qsd_glb_ast_data_get(lock, false);
255         if (qqi == NULL)
256                 /* valid race */
257                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
258
259         CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
260                " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
261                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
262                desc->gl_softlimit);
263
264         if (desc->gl_ver == 0) {
265                 CERROR("%s: invalid global index version "LPU64"\n",
266                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
267                 GOTO(out_qqi, rc = -EINVAL);
268         }
269
270         /* extract new hard & soft limits from the glimpse descriptor */
271         rec.qbr_hardlimit = desc->gl_hardlimit;
272         rec.qbr_softlimit = desc->gl_softlimit;
273         rec.qbr_time      = desc->gl_time;
274         rec.qbr_granted   = 0;
275
276         /* We can't afford disk io in the context of glimpse callback handling
277          * thread, so the on-disk global limits update has to be deferred. */
278         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
279                          desc->gl_ver, true);
280         EXIT;
281 out_qqi:
282         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
283         qqi_putref(qqi);
284 out:
285         req->rq_status = rc;
286         return rc;
287 }
288
289 /**
290  * Blocking callback handler for per-ID lock
291  *
292  * \param lock - is the lock for which ast occurred.
293  * \param desc - is the description of a conflicting lock in case of blocking
294  *               ast.
295  * \param data - is the value of lock->l_ast_data
296  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
297  *               cancellation and blocking ast's.
298  */
299 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
300                                void *data, int flag)
301 {
302         struct lustre_handle    lockh;
303         int                     rc = 0;
304         ENTRY;
305
306         switch(flag) {
307         case LDLM_CB_BLOCKING: {
308
309                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
310                 ldlm_lock2handle(lock, &lockh);
311                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
312                 break;
313         }
314         case LDLM_CB_CANCELING: {
315                 struct lu_env           *env;
316                 struct lquota_entry     *lqe;
317                 bool                     rel = false;
318
319                 LDLM_DEBUG(lock, "canceling global quota lock");
320                 lqe = qsd_id_ast_data_get(lock, true);
321                 if (lqe == NULL)
322                         break;
323
324                 LQUOTA_DEBUG(lqe, "losing ID lock");
325
326                 /* just local cancel (for stack clean up or eviction), don't
327                  * release quota space in this case */
328                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
329                         lqe_putref(lqe);
330                         break;
331                 }
332
333                 /* allocate environment */
334                 OBD_ALLOC_PTR(env);
335                 if (env == NULL) {
336                         lqe_putref(lqe);
337                         rc = -ENOMEM;
338                         break;
339                 }
340
341                 /* initialize environment */
342                 rc = lu_env_init(env, LCT_DT_THREAD);
343                 if (rc) {
344                         OBD_FREE_PTR(env);
345                         lqe_putref(lqe);
346                         break;
347                 }
348
349                 ldlm_lock2handle(lock, &lockh);
350                 lqe_write_lock(lqe);
351                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
352                         /* Clear lqe_lockh & reset qunit to 0 */
353                         qsd_set_qunit(lqe, 0);
354                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
355                         lqe->lqe_edquot = false;
356                         rel = true;
357                 }
358                 lqe_write_unlock(lqe);
359
360                 /* If there is qqacq inflight, the release will be skipped
361                  * at this time, and triggered on dqacq completion later,
362                  * which means there could be a short window that slave is
363                  * holding spare grant wihtout per-ID lock. */
364                 if (rel)
365                         rc = qsd_adjust(env, lqe);
366
367                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
368                 lqe_putref(lqe);
369                 lu_env_fini(env);
370                 OBD_FREE_PTR(env);
371                 break;
372         }
373         default:
374                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
375         }
376
377         RETURN(rc);
378 }
379
380 /*
381  * Glimpse callback handler for per-ID quota locks.
382  *
383  * \param lock - is the lock targeted by the glimpse
384  * \param data - is a pointer to the glimpse ptlrpc request
385  */
386 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
387 {
388         struct ptlrpc_request           *req = data;
389         struct lquota_entry             *lqe;
390         struct qsd_instance             *qsd;
391         struct ldlm_gl_lquota_desc      *desc;
392         struct lquota_lvb               *lvb;
393         int                              rc;
394         bool                             wakeup = false;
395         ENTRY;
396
397         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
398         if (rc)
399                 GOTO(out, rc);
400
401         lqe = qsd_id_ast_data_get(lock, false);
402         if (lqe == NULL)
403                 /* valid race */
404                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
405
406         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
407                      desc->gl_qunit);
408
409         qsd = lqe2qqi(lqe)->qqi_qsd;
410
411         lqe_write_lock(lqe);
412         lvb->lvb_id_rel = 0;
413         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
414                 long long space;
415
416                 /* extract new qunit from glimpse request */
417                 qsd_set_qunit(lqe, desc->gl_qunit);
418
419                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
420                 space -= lqe->lqe_usage;
421                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
422                 space -= lqe->lqe_qunit;
423
424                 if (space > 0) {
425                         if (lqe->lqe_pending_req > 0) {
426                                 LQUOTA_DEBUG(lqe, "request in flight, postpone "
427                                              "release of "LPD64, space);
428                                 lvb->lvb_id_may_rel = space;
429                         } else {
430                                 lqe->lqe_pending_req++;
431
432                                 /* release quota space in glimpse reply */
433                                 LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
434                                 lqe->lqe_granted -= space;
435                                 lvb->lvb_id_rel   = space;
436
437                                 lqe_write_unlock(lqe);
438                                 /* change the lqe_granted */
439                                 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
440                                                  (union lquota_rec *)&lqe->lqe_granted,
441                                                  0, false);
442                                 lqe_write_lock(lqe);
443
444                                 lqe->lqe_pending_req--;
445                                 wakeup = true;
446                         }
447                 }
448         }
449
450         lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
451         lqe_write_unlock(lqe);
452
453         if (wakeup)
454                 cfs_waitq_broadcast(&lqe->lqe_waiters);
455         lqe_putref(lqe);
456 out:
457         req->rq_status = rc;
458         RETURN(rc);
459 }
460
461 /**
462  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
463  *
464  * \param lockh  - is the local lock handle from lquota entry.
465  * \param rlockh - is the remote lock handle of the matched lock, if any.
466  *
467  * \retval 0      : on successful look up and \lockh contains the lock handle.
468  * \retval -ENOENT: no lock found
469  */
470 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
471 {
472         struct ldlm_lock        *lock;
473         int                      rc;
474         ENTRY;
475
476         LASSERT(lockh);
477
478         if (!lustre_handle_is_used(lockh))
479                 RETURN(-ENOENT);
480
481         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
482         if (rc)
483                 RETURN(-ENOENT);
484
485         LASSERT(lustre_handle_is_used(lockh));
486         ldlm_lock_dump_handle(D_QUOTA, lockh);
487
488         if (rlockh == NULL)
489                 /* caller not interested in remote handle */
490                 RETURN(0);
491
492         /* look up lock associated with local handle and extract remote handle
493          * to be packed in quota request */
494         lock = ldlm_handle2lock(lockh);
495         LASSERT(lock != NULL);
496         lustre_handle_copy(rlockh, &lock->l_remote_handle);
497         LDLM_LOCK_PUT(lock);
498
499         RETURN(0);
500 }
501
502 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
503 {
504         struct qsd_thread_info  *qti = qsd_info(env);
505         int                      rc;
506         ENTRY;
507
508         lqe_write_lock(lqe);
509         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
510             lqe->lqe_usage || lqe->lqe_granted) {
511                 lqe_write_unlock(lqe);
512                 RETURN(0);
513         }
514
515         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
516         if (lustre_handle_is_used(&qti->qti_lockh)) {
517                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
518                 qsd_set_qunit(lqe, 0);
519                 lqe->lqe_edquot = false;
520         }
521         lqe_write_unlock(lqe);
522
523         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
524         if (rc)
525                 RETURN(rc);
526
527         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
528         RETURN(0);
529 }