4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Author: Alex Tomas <bzzz@whamcloud.com>
41 #define DEBUG_SUBSYSTEM S_FILTER
43 #include "ofd_internal.h"
45 struct ofd_inconsistency_item {
46 struct list_head oii_list;
47 struct ofd_object *oii_obj;
48 struct lu_fid oii_pfid;
51 static void ofd_inconsistency_verify_one(const struct lu_env *env,
52 struct ofd_device *ofd,
53 struct ofd_inconsistency_item *oii,
54 struct lfsck_request *lr)
56 struct ofd_object *fo = oii->oii_obj;
57 struct lu_fid *pfid = &fo->ofo_pfid;
60 LASSERT(fo->ofo_pfid_checking);
61 LASSERT(!fo->ofo_pfid_verified);
63 lr->lr_fid = fo->ofo_header.loh_fid; /* OST-object itself FID. */
64 lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */
65 lr->lr_fid3 = *pfid; /* OST local stored PFID. */
67 rc = lfsck_in_notify(env, ofd->ofd_osd, lr);
68 ofd_write_lock(env, fo);
69 switch (lr->lr_status) {
74 CDEBUG(D_LFSCK, "%s: fail to verify OST local stored "
75 "PFID xattr for "DFID", the client given PFID "
76 DFID", OST local stored PFID "DFID": rc = %d\n",
77 ofd_obd(ofd)->obd_name,
78 PFID(&fo->ofo_header.loh_fid),
79 PFID(&oii->oii_pfid), PFID(pfid), rc);
81 fo->ofo_pfid_verified = 1;
83 case LPVS_INCONSISTENT:
86 ofd->ofd_inconsistency_self_detected++;
88 CDEBUG(D_LFSCK, "%s: fail to verify the client given "
89 "PFID for "DFID", the client given PFID "DFID
90 ", local stored PFID "DFID": rc = %d\n",
91 ofd_obd(ofd)->obd_name,
92 PFID(&fo->ofo_header.loh_fid),
93 PFID(&oii->oii_pfid), PFID(pfid), rc);
95 CDEBUG(D_LFSCK, "%s: both the client given PFID and "
96 "the OST local stored PFID are stale for the "
97 "OST-object "DFID", client given PFID is "DFID
98 ", local stored PFID is "DFID"\n",
99 ofd_obd(ofd)->obd_name,
100 PFID(&fo->ofo_header.loh_fid),
101 PFID(&oii->oii_pfid), PFID(pfid));
103 case LPVS_INCONSISTENT_TOFIX:
104 ofd->ofd_inconsistency_self_detected++;
106 ofd->ofd_inconsistency_self_repaired++;
107 CDEBUG(D_LFSCK, "%s: fixed the staled OST PFID xattr "
108 "for "DFID", with the client given PFID "DFID
109 ", the old stored PFID "DFID"\n",
110 ofd_obd(ofd)->obd_name,
111 PFID(&fo->ofo_header.loh_fid),
112 PFID(&oii->oii_pfid), PFID(pfid));
114 CDEBUG(D_LFSCK, "%s: fail to fix the OST PFID xattr "
115 "for "DFID", client given PFID "DFID", local "
116 "stored PFID "DFID": rc = %d\n",
117 ofd_obd(ofd)->obd_name,
118 PFID(&fo->ofo_header.loh_fid),
119 PFID(&oii->oii_pfid), PFID(pfid), rc);
121 *pfid = oii->oii_pfid;
122 fo->ofo_pfid_verified = 1;
127 fo->ofo_pfid_checking = 0;
128 ofd_write_unlock(env, fo);
130 lu_object_put(env, &fo->ofo_obj.do_lu);
134 static int ofd_inconsistency_verification_main(void *args)
137 struct ofd_device *ofd = args;
138 struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
139 struct ofd_inconsistency_item *oii;
140 struct lfsck_request *lr = NULL;
141 struct l_wait_info lwi = { 0 };
145 rc = lu_env_init(&env, LCT_DT_THREAD);
146 spin_lock(&ofd->ofd_inconsistency_lock);
147 thread_set_flags(thread, rc != 0 ? SVC_STOPPED : SVC_RUNNING);
148 wake_up_all(&thread->t_ctl_waitq);
149 spin_unlock(&ofd->ofd_inconsistency_lock);
154 if (unlikely(lr == NULL))
155 GOTO(out, rc = -ENOMEM);
157 lr->lr_event = LE_PAIRS_VERIFY;
158 lr->lr_active = LT_LAYOUT;
160 spin_lock(&ofd->ofd_inconsistency_lock);
162 if (unlikely(!thread_is_running(thread)))
165 while (!list_empty(&ofd->ofd_inconsistency_list)) {
166 oii = list_entry(ofd->ofd_inconsistency_list.next,
167 struct ofd_inconsistency_item,
169 list_del_init(&oii->oii_list);
170 spin_unlock(&ofd->ofd_inconsistency_lock);
171 ofd_inconsistency_verify_one(&env, ofd, oii, lr);
172 spin_lock(&ofd->ofd_inconsistency_lock);
175 spin_unlock(&ofd->ofd_inconsistency_lock);
176 l_wait_event(thread->t_ctl_waitq,
177 !list_empty(&ofd->ofd_inconsistency_list) ||
178 !thread_is_running(thread),
180 spin_lock(&ofd->ofd_inconsistency_lock);
183 while (!list_empty(&ofd->ofd_inconsistency_list)) {
184 struct ofd_object *fo;
186 oii = list_entry(ofd->ofd_inconsistency_list.next,
187 struct ofd_inconsistency_item,
189 list_del_init(&oii->oii_list);
191 spin_unlock(&ofd->ofd_inconsistency_lock);
193 ofd_write_lock(&env, fo);
194 fo->ofo_pfid_checking = 0;
195 ofd_write_unlock(&env, fo);
197 lu_object_put(&env, &fo->ofo_obj.do_lu);
199 spin_lock(&ofd->ofd_inconsistency_lock);
207 thread_set_flags(thread, SVC_STOPPED);
208 wake_up_all(&thread->t_ctl_waitq);
209 spin_unlock(&ofd->ofd_inconsistency_lock);
215 int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
217 struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
218 struct l_wait_info lwi = { 0 };
221 spin_lock(&ofd->ofd_inconsistency_lock);
222 if (unlikely(thread_is_running(thread))) {
223 spin_unlock(&ofd->ofd_inconsistency_lock);
228 thread_set_flags(thread, 0);
229 spin_unlock(&ofd->ofd_inconsistency_lock);
230 rc = PTR_ERR(kthread_run(ofd_inconsistency_verification_main, ofd,
231 "inconsistency_verification"));
232 if (IS_ERR_VALUE(rc)) {
233 CERROR("%s: cannot start self_repair thread: rc = %ld\n",
234 ofd_obd(ofd)->obd_name, rc);
237 l_wait_event(thread->t_ctl_waitq,
238 thread_is_running(thread) ||
239 thread_is_stopped(thread),
246 int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
248 struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
249 struct l_wait_info lwi = { 0 };
251 spin_lock(&ofd->ofd_inconsistency_lock);
252 if (thread_is_init(thread) || thread_is_stopped(thread)) {
253 spin_unlock(&ofd->ofd_inconsistency_lock);
258 thread_set_flags(thread, SVC_STOPPING);
259 spin_unlock(&ofd->ofd_inconsistency_lock);
260 wake_up_all(&thread->t_ctl_waitq);
261 l_wait_event(thread->t_ctl_waitq,
262 thread_is_stopped(thread),
268 static void ofd_add_inconsistency_item(const struct lu_env *env,
269 struct ofd_object *fo, struct obdo *oa)
271 struct ofd_device *ofd = ofd_obj2dev(fo);
272 struct ofd_inconsistency_item *oii;
277 CERROR("%s: cannot alloc memory for verify OST-object "
278 "consistency for "DFID", client given PFID "DFID
279 ", local stored PFID "DFID"\n",
280 ofd_obd(ofd)->obd_name, PFID(&fo->ofo_header.loh_fid),
281 oa->o_parent_seq, oa->o_parent_oid, oa->o_stripe_idx,
282 PFID(&fo->ofo_pfid));
287 INIT_LIST_HEAD(&oii->oii_list);
288 lu_object_get(&fo->ofo_obj.do_lu);
290 oii->oii_pfid.f_seq = oa->o_parent_seq;
291 oii->oii_pfid.f_oid = oa->o_parent_oid;
292 oii->oii_pfid.f_stripe_idx = oa->o_stripe_idx;
294 spin_lock(&ofd->ofd_inconsistency_lock);
295 if (fo->ofo_pfid_checking || fo->ofo_pfid_verified) {
296 spin_unlock(&ofd->ofd_inconsistency_lock);
302 fo->ofo_pfid_checking = 1;
303 if (list_empty(&ofd->ofd_inconsistency_list))
305 list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
306 spin_unlock(&ofd->ofd_inconsistency_lock);
308 wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
310 /* XXX: When the found inconsistency exceeds some threshold,
311 * we can trigger the LFSCK to scan part of the system
312 * or the whole system, which depends on how to define
313 * the threshold, a simple way maybe like that: define
314 * the absolute value of how many inconsisteny allowed
315 * to be repaired via self detect/repair mechanism, if
316 * exceeded, then trigger the LFSCK to scan the layout
317 * inconsistency within the whole system. */
320 int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
323 struct lu_fid *pfid = &fo->ofo_pfid;
327 if (fid_is_sane(pfid)) {
328 if (likely(oa->o_parent_seq == pfid->f_seq &&
329 oa->o_parent_oid == pfid->f_oid &&
330 oa->o_stripe_idx == pfid->f_stripe_idx))
333 if (fo->ofo_pfid_verified)
337 /* The OST-object may be inconsistent, and we need further verification.
338 * To avoid block the RPC service thread, return -EINPROGRESS to client
339 * and make it retry later. */
340 if (fo->ofo_pfid_checking)
341 RETURN(-EINPROGRESS);
343 rc = ofd_object_ff_load(env, fo);
350 if (likely(oa->o_parent_seq == pfid->f_seq &&
351 oa->o_parent_oid == pfid->f_oid &&
352 oa->o_stripe_idx == pfid->f_stripe_idx))
355 /* Push it to the dedicated thread for further verification. */
356 ofd_add_inconsistency_item(env, fo, oa);
358 RETURN(-EINPROGRESS);
361 static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
362 struct ofd_device *ofd, const struct lu_fid *fid,
363 struct lu_attr *la, struct obdo *oa, int niocount,
364 struct niobuf_remote *rnb, int *nr_local,
365 struct niobuf_local *lnb, char *jobid)
367 struct ofd_object *fo;
368 int i, j, rc, tot_bytes = 0;
371 LASSERT(env != NULL);
373 fo = ofd_object_find(env, ofd, fid);
378 ofd_read_lock(env, fo);
379 if (!ofd_object_exists(fo))
380 GOTO(unlock, rc = -ENOENT);
382 if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
383 rc = ofd_verify_ff(env, fo, oa);
388 /* parse remote buffers to local buffers and prepare the latter */
390 for (i = 0, j = 0; i < niocount; i++) {
391 rc = dt_bufs_get(env, ofd_object_child(fo), rnb + i,
392 lnb + j, 0, ofd_object_capa(env, fo));
393 if (unlikely(rc < 0))
395 LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
396 /* correct index for local buffers to continue with */
399 LASSERT(j <= PTLRPC_MAX_BRW_PAGES);
400 tot_bytes += rnb[i].rnb_len;
403 LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
404 rc = dt_attr_get(env, ofd_object_child(fo), la,
405 ofd_object_capa(env, fo));
409 rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local);
413 ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid, tot_bytes);
417 dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
419 ofd_read_unlock(env, fo);
420 ofd_object_put(env, fo);
424 static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
425 struct ofd_device *ofd, const struct lu_fid *fid,
426 struct lu_attr *la, struct obdo *oa,
427 int objcount, struct obd_ioobj *obj,
428 struct niobuf_remote *rnb, int *nr_local,
429 struct niobuf_local *lnb, char *jobid)
431 struct ofd_object *fo;
432 int i, j, k, rc = 0, tot_bytes = 0;
435 LASSERT(env != NULL);
436 LASSERT(objcount == 1);
438 if (unlikely(exp->exp_obd->obd_recovering)) {
439 struct ofd_thread_info *info = ofd_info(env);
441 /* copied from ofd_precreate_object */
442 /* XXX this should be consolidated to use the same code
443 * instead of a copy, due to the ongoing risk of bugs. */
444 memset(&info->fti_attr, 0, sizeof(info->fti_attr));
445 info->fti_attr.la_valid = LA_TYPE | LA_MODE;
446 info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666;
447 info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
448 /* Initialize a/c/m time so any client timestamp will always
449 * be newer and update the inode. ctime = 0 is also handled
450 * specially in osd_inode_setattr(). See LU-221, LU-1042 */
451 info->fti_attr.la_atime = 0;
452 info->fti_attr.la_mtime = 0;
453 info->fti_attr.la_ctime = 0;
455 fo = ofd_object_find_or_create(env, ofd, fid, &info->fti_attr);
457 fo = ofd_object_find(env, ofd, fid);
461 GOTO(out, rc = PTR_ERR(fo));
464 ofd_read_lock(env, fo);
465 if (!ofd_object_exists(fo)) {
466 CERROR("%s: BRW to missing obj "DOSTID"\n",
467 exp->exp_obd->obd_name, POSTID(&obj->ioo_oid));
468 ofd_read_unlock(env, fo);
469 ofd_object_put(env, fo);
470 GOTO(out, rc = -ENOENT);
473 if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
474 rc = ofd_verify_ff(env, fo, oa);
476 ofd_read_unlock(env, fo);
477 ofd_object_put(env, fo);
482 /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
483 * space back if possible */
484 ofd_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
486 /* parse remote buffers to local buffers and prepare the latter */
488 for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
489 rc = dt_bufs_get(env, ofd_object_child(fo),
491 ofd_object_capa(env, fo));
492 if (unlikely(rc < 0))
494 LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
495 /* correct index for local buffers to continue with */
496 for (k = 0; k < rc; k++) {
497 lnb[j+k].lnb_flags = rnb[i].rnb_flags;
498 if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED))
499 lnb[j+k].lnb_rc = -ENOSPC;
501 /* remote client can't break through quota */
502 if (exp_connect_rmtclient(exp))
503 lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA;
507 LASSERT(j <= PTLRPC_MAX_BRW_PAGES);
508 tot_bytes += rnb[i].rnb_len;
510 LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
512 rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
513 if (unlikely(rc != 0))
516 ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
519 dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
520 ofd_read_unlock(env, fo);
521 /* ofd_grant_prepare_write() was called, so we must commit */
522 ofd_grant_commit(env, exp, rc);
524 /* let's still process incoming grant information packed in the oa,
525 * but without enforcing grant since we won't proceed with the write.
526 * Just like a read request actually. */
527 ofd_grant_prepare_read(env, exp, oa);
531 int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
532 struct obdo *oa, int objcount, struct obd_ioobj *obj,
533 struct niobuf_remote *rnb, int *nr_local,
534 struct niobuf_local *lnb, struct obd_trans_info *oti,
535 struct lustre_capa *capa)
537 struct tgt_session_info *tsi = tgt_ses_info(env);
538 struct ofd_device *ofd = ofd_exp(exp);
539 struct ofd_thread_info *info;
541 const struct lu_fid *fid = &oa->o_oi.oi_fid;
544 if (*nr_local > PTLRPC_MAX_BRW_PAGES) {
545 CERROR("%s: bulk has too many pages %d, which exceeds the"
546 "maximum pages per RPC of %d\n",
547 exp->exp_obd->obd_name, *nr_local, PTLRPC_MAX_BRW_PAGES);
551 if (tgt_ses_req(tsi) == NULL) { /* echo client case */
552 LASSERT(oti != NULL);
553 info = ofd_info_init(env, exp);
554 ofd_oti2info(info, oti);
555 jobid = oti->oti_jobid;
557 info = tsi2ofd_info(tsi);
558 jobid = tsi->tsi_jobid;
563 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
564 struct ofd_seq *oseq;
566 oseq = ofd_seq_load(env, ofd, ostid_seq(&oa->o_oi));
568 CERROR("%s: Can not find seq for "DOSTID
569 ": rc = %ld\n", ofd_name(ofd), POSTID(&oa->o_oi),
574 if (oseq->os_destroys_in_progress == 0) {
575 /* don't fail lookups for orphan recovery, it causes
576 * later LBUGs when objects still exist during
578 ofd_seq_put(env, oseq);
581 ofd_seq_put(env, oseq);
584 LASSERT(objcount == 1);
585 LASSERT(obj->ioo_bufcnt > 0);
587 if (cmd == OBD_BRW_WRITE) {
588 rc = ofd_auth_capa(exp, fid, ostid_seq(&oa->o_oi),
589 capa, CAPA_OPC_OSS_WRITE);
591 la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
592 rc = ofd_preprw_write(env, exp, ofd, fid,
593 &info->fti_attr, oa, objcount,
594 obj, rnb, nr_local, lnb, jobid);
596 } else if (cmd == OBD_BRW_READ) {
597 rc = ofd_auth_capa(exp, fid, ostid_seq(&oa->o_oi),
598 capa, CAPA_OPC_OSS_READ);
600 ofd_grant_prepare_read(env, exp, oa);
601 rc = ofd_preprw_read(env, exp, ofd, fid,
603 obj->ioo_bufcnt, rnb, nr_local,
605 obdo_from_la(oa, &info->fti_attr, LA_ATIME);
608 CERROR("%s: wrong cmd %d received!\n",
609 exp->exp_obd->obd_name, cmd);
616 ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd,
617 const struct lu_fid *fid, int objcount, int niocount,
618 struct niobuf_local *lnb)
620 struct ofd_object *fo;
624 LASSERT(niocount > 0);
626 fo = ofd_object_find(env, ofd, fid);
630 LASSERT(ofd_object_exists(fo));
631 dt_bufs_put(env, ofd_object_child(fo), lnb, niocount);
633 ofd_read_unlock(env, fo);
634 ofd_object_put(env, fo);
635 /* second put is pair to object_get in ofd_preprw_read */
636 ofd_object_put(env, fo);
642 ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
643 struct ofd_object *ofd_obj, struct lu_attr *la,
644 struct filter_fid *ff)
646 struct ofd_thread_info *info = ofd_info(env);
647 __u64 valid = la->la_valid;
650 struct dt_object *dt_obj;
657 dt_obj = ofd_object_child(ofd_obj);
658 LASSERT(dt_obj != NULL);
660 la->la_valid &= LA_UID | LA_GID;
662 rc = ofd_attr_handle_ugid(env, ofd_obj, la, 0 /* !is_setattr */);
667 rc = ofd_object_ff_load(env, ofd_obj);
674 if (!la->la_valid && !ff_needed)
675 /* no attributes to set */
678 th = ofd_trans_create(env, ofd);
680 GOTO(out, rc = PTR_ERR(th));
683 rc = dt_declare_attr_set(env, dt_obj, la, th);
689 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
690 ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
691 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
692 ff->ff_parent.f_oid =
693 cpu_to_le32(le32_to_cpu(ff->ff_parent.f_oid) - 1);
695 info->fti_buf.lb_buf = ff;
696 info->fti_buf.lb_len = sizeof(*ff);
697 rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
698 XATTR_NAME_FID, 0, th);
703 /* We don't need a transno for this operation which will be re-executed
704 * anyway when the OST_WRITE (with a transno assigned) is replayed */
705 rc = dt_trans_start_local(env, ofd->ofd_osd , th);
711 rc = dt_attr_set(env, dt_obj, la, th,
712 ofd_object_capa(env, ofd_obj));
717 /* set filter fid EA */
719 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
722 rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
725 ofd_obj->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq);
726 ofd_obj->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid);
727 /* Currently, the filter_fid::ff_parent::f_ver is not
728 * the real parent MDT-object's FID::f_ver, instead it
729 * is the OST-object index in its parent MDT-object's
731 ofd_obj->ofo_pfid.f_stripe_idx =
732 le32_to_cpu(ff->ff_parent.f_stripe_idx);
739 dt_trans_stop(env, ofd->ofd_osd, th);
741 la->la_valid = valid;
745 struct ofd_soft_sync_callback {
746 struct dt_txn_commit_cb ossc_cb;
747 struct obd_export *ossc_exp;
750 static void ofd_cb_soft_sync(struct lu_env *env, struct thandle *th,
751 struct dt_txn_commit_cb *cb, int err)
753 struct ofd_soft_sync_callback *ossc;
755 ossc = container_of(cb, struct ofd_soft_sync_callback, ossc_cb);
757 CDEBUG(D_INODE, "export %p soft sync count is reset\n", ossc->ossc_exp);
758 atomic_set(&ossc->ossc_exp->exp_filter_data.fed_soft_sync_count, 0);
760 class_export_cb_put(ossc->ossc_exp);
764 static int ofd_soft_sync_cb_add(struct thandle *th, struct obd_export *exp)
766 struct ofd_soft_sync_callback *ossc;
767 struct dt_txn_commit_cb *dcb;
774 ossc->ossc_exp = class_export_cb_get(exp);
776 dcb = &ossc->ossc_cb;
777 dcb->dcb_func = ofd_cb_soft_sync;
778 CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
779 strncpy(dcb->dcb_name, "ofd_cb_soft_sync", MAX_COMMIT_CB_STR_LEN);
780 dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
782 rc = dt_trans_cb_add(th, dcb);
784 class_export_cb_put(exp);
792 ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
793 struct ofd_device *ofd, const struct lu_fid *fid,
794 struct lu_attr *la, struct filter_fid *ff, int objcount,
795 int niocount, struct niobuf_local *lnb, int old_rc)
797 struct ofd_thread_info *info = ofd_info(env);
798 struct ofd_object *fo;
804 struct filter_export_data *fed = &exp->exp_filter_data;
805 bool soft_sync = false;
806 bool cb_registered = false;
810 LASSERT(objcount == 1);
812 fo = ofd_object_find(env, ofd, fid);
814 LASSERT(ofd_object_exists(fo));
816 o = ofd_object_child(fo);
820 GOTO(out, rc = old_rc);
823 * The first write to each object must set some attributes. It is
824 * important to set the uid/gid before calling
825 * dt_declare_write_commit() since quota enforcement is now handled in
828 rc = ofd_write_attr_set(env, ofd, fo, la, ff);
832 la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
835 th = ofd_trans_create(env, ofd);
837 GOTO(out, rc = PTR_ERR(th));
839 th->th_sync |= ofd->ofd_syncjournal;
840 if (th->th_sync == 0) {
841 for (i = 0; i < niocount; i++) {
842 if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) {
846 if (lnb[i].lnb_flags & OBD_BRW_SOFT_SYNC)
851 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
852 GOTO(out_stop, rc = -EINPROGRESS);
854 rc = dt_declare_write_commit(env, o, lnb, niocount, th);
859 /* update [mac]time if needed */
860 rc = dt_declare_attr_set(env, o, la, th);
865 rc = ofd_trans_start(env, ofd, fo, th);
869 rc = dt_write_commit(env, o, lnb, niocount, th);
874 rc = dt_attr_set(env, o, la, th, ofd_object_capa(env, fo));
879 /* get attr to return */
880 rc = dt_attr_get(env, o, la, ofd_object_capa(env, fo));
883 /* Force commit to make the just-deleted blocks
884 * reusable. LU-456 */
888 /* do this before trans stop in case commit has finished */
889 if (!th->th_sync && soft_sync && !cb_registered) {
890 ofd_soft_sync_cb_add(th, exp);
891 cb_registered = true;
894 ofd_trans_stop(env, ofd, th, rc);
895 if (rc == -ENOSPC && retries++ < 3) {
896 CDEBUG(D_INODE, "retry after force commit, retries:%d\n",
902 /* reset fed_soft_sync_count upon non-SOFT_SYNC RPC */
903 atomic_set(&fed->fed_soft_sync_count, 0);
904 else if (atomic_inc_return(&fed->fed_soft_sync_count) ==
905 ofd->ofd_soft_sync_limit)
906 dt_commit_async(env, ofd->ofd_osd);
909 dt_bufs_put(env, o, lnb, niocount);
910 ofd_read_unlock(env, fo);
911 ofd_object_put(env, fo);
912 /* second put is pair to object_get in ofd_preprw_write */
913 ofd_object_put(env, fo);
914 ofd_grant_commit(env, info->fti_exp, old_rc);
918 int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
919 struct obdo *oa, int objcount, struct obd_ioobj *obj,
920 struct niobuf_remote *rnb, int npages,
921 struct niobuf_local *lnb, struct obd_trans_info *oti,
924 struct ofd_thread_info *info = ofd_info(env);
925 struct ofd_mod_data *fmd;
927 struct ofd_device *ofd = ofd_exp(exp);
928 struct filter_fid *ff = NULL;
929 const struct lu_fid *fid = &oa->o_oi.oi_fid;
934 if (cmd == OBD_BRW_WRITE) {
935 /* Don't update timestamps if this write is older than a
936 * setattr which modifies the timestamps. b=10150 */
938 /* XXX when we start having persistent reservations this needs
939 * to be changed to ofd_fmd_get() to create the fmd if it
940 * doesn't already exist so we can store the reservation handle
942 valid = OBD_MD_FLUID | OBD_MD_FLGID;
943 fmd = ofd_fmd_find(exp, fid);
944 if (!fmd || fmd->fmd_mactime_xid < info->fti_xid)
945 valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME |
947 ofd_fmd_put(exp, fmd);
948 la_from_obdo(&info->fti_attr, oa, valid);
950 if (oa->o_valid & OBD_MD_FLFID) {
951 ff = &info->fti_mds_fid;
952 ofd_prepare_fidea(ff, oa);
955 rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
956 ff, objcount, npages, lnb, old_rc);
958 obdo_from_la(oa, &info->fti_attr,
959 OFD_VALID_FLAGS | LA_GID | LA_UID);
961 obdo_from_la(oa, &info->fti_attr, LA_GID | LA_UID);
963 /* don't report overquota flag if we failed before reaching
965 if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) {
966 /* return the overquota flags to client */
967 if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) {
968 if (oa->o_valid & OBD_MD_FLFLAGS)
969 oa->o_flags |= OBD_FL_NO_USRQUOTA;
971 oa->o_flags = OBD_FL_NO_USRQUOTA;
974 if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) {
975 if (oa->o_valid & OBD_MD_FLFLAGS)
976 oa->o_flags |= OBD_FL_NO_GRPQUOTA;
978 oa->o_flags = OBD_FL_NO_GRPQUOTA;
981 oa->o_valid |= OBD_MD_FLFLAGS;
982 oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA;
984 } else if (cmd == OBD_BRW_READ) {
985 struct ldlm_namespace *ns = ofd->ofd_namespace;
987 /* If oa != NULL then ofd_preprw_read updated the inode
988 * atime and we should update the lvb so that other glimpses
989 * will also get the updated value. bug 5972 */
990 if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) {
991 struct ldlm_resource *rs = NULL;
993 ost_fid_build_resid(fid, &info->fti_resid);
994 rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
997 ns->ns_lvbo->lvbo_update(rs, NULL, 1);
998 ldlm_resource_putref(rs);
1001 rc = ofd_commitrw_read(env, ofd, fid, objcount,
1011 ofd_info2oti(info, oti);