Whamcloud - gitweb
75ce7eac8b5ee06faee2fe6688e5cd8c0ed11b47
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <lustre_dlm.h>
34 #include <obd_class.h>
35
36 #include "qsd_internal.h"
37
38 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
39                            struct ldlm_lock_desc *desc, void *data,
40                            int flag);
41 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
42
43 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
44 static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
45
46 struct ldlm_enqueue_info qsd_glb_einfo = {
47         .ei_type        = LDLM_PLAIN,
48         .ei_mode        = LCK_CR,
49         .ei_cb_bl       = qsd_glb_blocking_ast,
50         .ei_cb_cp       = ldlm_completion_ast,
51         .ei_cb_gl       = qsd_glb_glimpse_ast,
52 };
53
54 struct ldlm_enqueue_info qsd_id_einfo = {
55         .ei_type        = LDLM_PLAIN,
56         .ei_mode        = LCK_CR,
57         .ei_cb_bl       = qsd_id_blocking_ast,
58         .ei_cb_cp       = ldlm_completion_ast,
59         .ei_cb_gl       = qsd_id_glimpse_ast,
60 };
61
62 /*
63  * Return qsd_qtype_info structure associated with a global lock
64  *
65  * \param lock - is the global lock from which we should extract the qqi
66  * \param reset - whether lock->l_ast_data should be cleared
67  */
68 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
69                                                    bool reset) {
70         struct qsd_qtype_info *qqi;
71         ENTRY;
72
73         lock_res_and_lock(lock);
74         qqi = lock->l_ast_data;
75         if (qqi != NULL) {
76                 qqi_getref(qqi);
77                 if (reset)
78                         lock->l_ast_data = NULL;
79         }
80         unlock_res_and_lock(lock);
81
82         if (qqi != NULL)
83                 /* it is not safe to call lu_ref_add() under spinlock */
84                 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
85
86         if (reset && qqi != NULL) {
87                 /* release qqi reference hold for the lock */
88                 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
89                 qqi_putref(qqi);
90         }
91         RETURN(qqi);
92 }
93
94 /*
95  * Return lquota entry structure associated with a per-ID lock
96  *
97  * \param lock - is the per-ID lock from which we should extract the lquota
98  *               entry
99  * \param reset - whether lock->l_ast_data should be cleared
100  */
101 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
102                                                 bool reset) {
103         struct lquota_entry *lqe;
104         ENTRY;
105
106         lock_res_and_lock(lock);
107         lqe = lock->l_ast_data;
108         if (lqe != NULL) {
109                 lqe_getref(lqe);
110                 if (reset)
111                         lock->l_ast_data = NULL;
112         }
113         unlock_res_and_lock(lock);
114
115         if (reset && lqe != NULL)
116                 /* release lqe reference hold for the lock */
117                 lqe_putref(lqe);
118         RETURN(lqe);
119 }
120
121 /*
122  * Glimpse callback handler for all quota locks. This function extracts
123  * information from the glimpse request.
124  *
125  * \param lock - is the lock targeted by the glimpse
126  * \param data - is a pointer to the glimpse ptlrpc request
127  * \param req  - is the glimpse request
128  * \param desc - is the glimpse descriptor describing the purpose of the glimpse
129  *               request.
130  * \param lvb  - is the pointer to the lvb in the reply buffer
131  *
132  * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
133  *         appropriate error on failure
134  */
135 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
136                                   struct ldlm_gl_lquota_desc **desc, void **lvb)
137 {
138         int rc;
139         ENTRY;
140
141         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
142
143         /* glimpse on quota locks always packs a glimpse descriptor */
144         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
145
146         /* extract glimpse descriptor */
147         *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
148         if (*desc == NULL)
149                 RETURN(-EFAULT);
150
151         /* prepare reply */
152         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
153                              sizeof(struct lquota_lvb));
154         rc = req_capsule_server_pack(&req->rq_pill);
155         if (rc != 0) {
156                 CERROR("Can't pack response, rc %d\n", rc);
157                 RETURN(rc);
158         }
159
160         /* extract lvb */
161         *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
162
163         RETURN(0);
164 }
165
166 /*
167  * Blocking callback handler for global index lock
168  *
169  * \param lock - is the lock for which ast occurred.
170  * \param desc - is the description of a conflicting lock in case of blocking
171  *               ast.
172  * \param data - is the value of lock->l_ast_data
173  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
174  *               cancellation and blocking ast's.
175  */
176 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
177                                 struct ldlm_lock_desc *desc, void *data,
178                                 int flag)
179 {
180         int rc = 0;
181         ENTRY;
182
183         switch(flag) {
184         case LDLM_CB_BLOCKING: {
185                 struct lustre_handle lockh;
186
187                 LDLM_DEBUG(lock, "blocking AST on global quota lock");
188                 ldlm_lock2handle(lock, &lockh);
189                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
190                 break;
191         }
192         case LDLM_CB_CANCELING: {
193                 struct qsd_qtype_info *qqi;
194
195                 LDLM_DEBUG(lock, "canceling global quota lock");
196
197                 qqi = qsd_glb_ast_data_get(lock, true);
198                 if (qqi == NULL)
199                         break;
200
201                 /* we are losing the global index lock, so let's mark the
202                  * global & slave indexes as not up-to-date any more */
203                 write_lock(&qqi->qqi_qsd->qsd_lock);
204                 qqi->qqi_glb_uptodate = false;
205                 qqi->qqi_slv_uptodate = false;
206                 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
207                         memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
208                 write_unlock(&qqi->qqi_qsd->qsd_lock);
209
210                 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
211                        qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
212
213                 /* kick off reintegration thread if not running already, if
214                  * it's just local cancel (for stack clean up or eviction),
215                  * don't re-trigger the reintegration. */
216                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
217                         qsd_start_reint_thread(qqi);
218
219                 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
220                 qqi_putref(qqi);
221                 break;
222         }
223         default:
224                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
225         }
226
227         RETURN(rc);
228 }
229
230 /*
231  * Glimpse callback handler for global quota lock.
232  *
233  * \param lock - is the lock targeted by the glimpse
234  * \param data - is a pointer to the glimpse ptlrpc request
235  */
236 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
237 {
238         struct ptlrpc_request           *req = data;
239         struct qsd_qtype_info           *qqi;
240         struct ldlm_gl_lquota_desc      *desc;
241         struct lquota_lvb               *lvb;
242         struct lquota_glb_rec            rec;
243         int                              rc;
244         ENTRY;
245
246         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
247         if (rc)
248                 GOTO(out, rc);
249
250         qqi = qsd_glb_ast_data_get(lock, false);
251         if (qqi == NULL)
252                 /* valid race */
253                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
254
255         CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
256                " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
257                desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
258                desc->gl_softlimit);
259
260         if (desc->gl_ver == 0) {
261                 CERROR("%s: invalid global index version "LPU64"\n",
262                        qqi->qqi_qsd->qsd_svname, desc->gl_ver);
263                 GOTO(out_qqi, rc = -EINVAL);
264         }
265
266         /* extract new hard & soft limits from the glimpse descriptor */
267         rec.qbr_hardlimit = desc->gl_hardlimit;
268         rec.qbr_softlimit = desc->gl_softlimit;
269         rec.qbr_time      = desc->gl_time;
270         rec.qbr_granted   = 0;
271
272         /* We can't afford disk io in the context of glimpse callback handling
273          * thread, so the on-disk global limits update has to be deferred. */
274         qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
275                          desc->gl_ver, true);
276         EXIT;
277 out_qqi:
278         lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
279         qqi_putref(qqi);
280 out:
281         req->rq_status = rc;
282         return rc;
283 }
284
285 /**
286  * Blocking callback handler for per-ID lock
287  *
288  * \param lock - is the lock for which ast occurred.
289  * \param desc - is the description of a conflicting lock in case of blocking
290  *               ast.
291  * \param data - is the value of lock->l_ast_data
292  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
293  *               cancellation and blocking ast's.
294  */
295 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
296                                void *data, int flag)
297 {
298         struct lustre_handle    lockh;
299         int                     rc = 0;
300         ENTRY;
301
302         switch(flag) {
303         case LDLM_CB_BLOCKING: {
304
305                 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
306                 ldlm_lock2handle(lock, &lockh);
307                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
308                 break;
309         }
310         case LDLM_CB_CANCELING: {
311                 struct lu_env           *env;
312                 struct lquota_entry     *lqe;
313                 bool                     rel = false;
314
315                 LDLM_DEBUG(lock, "canceling global quota lock");
316                 lqe = qsd_id_ast_data_get(lock, true);
317                 if (lqe == NULL)
318                         break;
319
320                 LQUOTA_DEBUG(lqe, "losing ID lock");
321
322                 /* just local cancel (for stack clean up or eviction), don't
323                  * release quota space in this case */
324                 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
325                         lqe_putref(lqe);
326                         break;
327                 }
328
329                 /* allocate environment */
330                 OBD_ALLOC_PTR(env);
331                 if (env == NULL) {
332                         lqe_putref(lqe);
333                         rc = -ENOMEM;
334                         break;
335                 }
336
337                 /* initialize environment */
338                 rc = lu_env_init(env, LCT_DT_THREAD);
339                 if (rc) {
340                         OBD_FREE_PTR(env);
341                         lqe_putref(lqe);
342                         break;
343                 }
344
345                 ldlm_lock2handle(lock, &lockh);
346                 lqe_write_lock(lqe);
347                 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
348                         /* Clear lqe_lockh & reset qunit to 0 */
349                         qsd_set_qunit(lqe, 0);
350                         memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
351                         lqe->lqe_edquot = false;
352                         rel = true;
353                 }
354                 lqe_write_unlock(lqe);
355
356                 /* If there is qqacq inflight, the release will be skipped
357                  * at this time, and triggered on dqacq completion later,
358                  * which means there could be a short window that slave is
359                  * holding spare grant wihtout per-ID lock. */
360                 if (rel)
361                         rc = qsd_adjust(env, lqe);
362
363                 /* release lqe reference grabbed by qsd_id_ast_data_get() */
364                 lqe_putref(lqe);
365                 lu_env_fini(env);
366                 OBD_FREE_PTR(env);
367                 break;
368         }
369         default:
370                 LASSERTF(0, "invalid flags for blocking ast %d", flag);
371         }
372
373         RETURN(rc);
374 }
375
376 /*
377  * Glimpse callback handler for per-ID quota locks.
378  *
379  * \param lock - is the lock targeted by the glimpse
380  * \param data - is a pointer to the glimpse ptlrpc request
381  */
382 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
383 {
384         struct ptlrpc_request           *req = data;
385         struct lquota_entry             *lqe;
386         struct qsd_instance             *qsd;
387         struct ldlm_gl_lquota_desc      *desc;
388         struct lquota_lvb               *lvb;
389         int                              rc;
390         bool                             wakeup = false;
391         ENTRY;
392
393         rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
394         if (rc)
395                 GOTO(out, rc);
396
397         lqe = qsd_id_ast_data_get(lock, false);
398         if (lqe == NULL)
399                 /* valid race */
400                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
401
402         LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
403                      desc->gl_qunit);
404
405         qsd = lqe2qqi(lqe)->qqi_qsd;
406
407         lqe_write_lock(lqe);
408         lvb->lvb_id_rel = 0;
409         if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
410                 long long space;
411
412                 /* extract new qunit from glimpse request */
413                 qsd_set_qunit(lqe, desc->gl_qunit);
414
415                 space  = lqe->lqe_granted - lqe->lqe_pending_rel;
416                 space -= lqe->lqe_usage;
417                 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
418                 space -= lqe->lqe_qunit;
419
420                 if (space > 0) {
421                         if (lqe->lqe_pending_req > 0) {
422                                 LQUOTA_DEBUG(lqe, "request in flight, postpone "
423                                              "release of "LPD64, space);
424                                 lvb->lvb_id_may_rel = space;
425                         } else {
426                                 lqe->lqe_pending_req++;
427
428                                 /* release quota space in glimpse reply */
429                                 LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
430                                 lqe->lqe_granted -= space;
431                                 lvb->lvb_id_rel   = space;
432
433                                 lqe_write_unlock(lqe);
434                                 /* change the lqe_granted */
435                                 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
436                                                  (union lquota_rec *)&lqe->lqe_granted,
437                                                  0, false);
438                                 lqe_write_lock(lqe);
439
440                                 lqe->lqe_pending_req--;
441                                 wakeup = true;
442                         }
443                 }
444         }
445
446         lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
447         lqe_write_unlock(lqe);
448
449         if (wakeup)
450                 wake_up_all(&lqe->lqe_waiters);
451         lqe_putref(lqe);
452 out:
453         req->rq_status = rc;
454         RETURN(rc);
455 }
456
457 /**
458  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
459  *
460  * \param lockh  - is the local lock handle from lquota entry.
461  * \param rlockh - is the remote lock handle of the matched lock, if any.
462  *
463  * \retval 0      : on successful look up and \lockh contains the lock handle.
464  * \retval -ENOENT: no lock found
465  */
466 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
467 {
468         struct ldlm_lock        *lock;
469         int                      rc;
470         ENTRY;
471
472         LASSERT(lockh);
473
474         if (!lustre_handle_is_used(lockh))
475                 RETURN(-ENOENT);
476
477         rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
478         if (rc)
479                 RETURN(-ENOENT);
480
481         LASSERT(lustre_handle_is_used(lockh));
482         ldlm_lock_dump_handle(D_QUOTA, lockh);
483
484         if (rlockh == NULL)
485                 /* caller not interested in remote handle */
486                 RETURN(0);
487
488         /* look up lock associated with local handle and extract remote handle
489          * to be packed in quota request */
490         lock = ldlm_handle2lock(lockh);
491         LASSERT(lock != NULL);
492         lustre_handle_copy(rlockh, &lock->l_remote_handle);
493         LDLM_LOCK_PUT(lock);
494
495         RETURN(0);
496 }
497
498 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
499 {
500         struct qsd_thread_info  *qti = qsd_info(env);
501         int                      rc;
502         ENTRY;
503
504         lqe_write_lock(lqe);
505         if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
506             lqe->lqe_usage || lqe->lqe_granted) {
507                 lqe_write_unlock(lqe);
508                 RETURN(0);
509         }
510
511         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
512         if (lustre_handle_is_used(&qti->qti_lockh)) {
513                 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
514                 qsd_set_qunit(lqe, 0);
515                 lqe->lqe_edquot = false;
516         }
517         lqe_write_unlock(lqe);
518
519         rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
520         if (rc)
521                 RETURN(rc);
522
523         ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
524         RETURN(0);
525 }