Whamcloud - gitweb
eccefecd257fe1eb9c546cdac85ce77530a213b7
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_open.c
32  *
33  * Lustre Metadata Target (mdt) open/close file handling
34  *
35  * Author: Huang Hua <huanghua@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <lustre_acl.h>
41 #include <lustre_mds.h>
42 #include <lustre_swab.h>
43 #include "mdt_internal.h"
44 #include <lustre_nodemap.h>
45
46 static const char mfd_open_handle_owner[] = "mdt";
47
48 static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep,
49                            struct mdt_lock_handle *lhc);
50
51 /* Create a new mdt_file_data struct, initialize it,
52  * and insert it to global hash table */
53 struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
54 {
55         struct mdt_file_data *mfd;
56
57         ENTRY;
58         OBD_ALLOC_PTR(mfd);
59         if (mfd != NULL) {
60                 refcount_set(&mfd->mfd_open_handle.h_ref, 1);
61                 INIT_HLIST_NODE(&mfd->mfd_open_handle.h_link);
62                 mfd->mfd_owner = med;
63                 INIT_LIST_HEAD(&mfd->mfd_list);
64                 class_handle_hash(&mfd->mfd_open_handle, mfd_open_handle_owner);
65         }
66
67         RETURN(mfd);
68 }
69
70 /*
71  * Find the mfd pointed to by handle in global hash table.
72  * In case of replay the handle is obsoleted
73  * but mfd can be found in mfd list by that handle.
74  * Callers need to be holding med_open_lock.
75  */
76 struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med,
77                                         const struct lustre_handle *open_handle,
78                                         bool is_replay_or_resent)
79 {
80         struct mdt_file_data   *mfd;
81
82         ENTRY;
83         LASSERT(open_handle != NULL);
84         mfd = class_handle2object(open_handle->cookie, mfd_open_handle_owner);
85         if (mfd)
86                 refcount_dec(&mfd->mfd_open_handle.h_ref);
87
88         /* during dw/setattr replay the mfd can be found by old handle */
89         if ((!mfd || mfd->mfd_owner != med) && is_replay_or_resent) {
90                 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
91                         if (mfd->mfd_open_handle_old.cookie ==
92                             open_handle->cookie)
93                                 RETURN(mfd);
94                 }
95                 mfd = NULL;
96         }
97
98         RETURN(mfd);
99 }
100
101 /* free mfd */
102 void mdt_mfd_free(struct mdt_file_data *mfd)
103 {
104         LASSERT(refcount_read(&mfd->mfd_open_handle.h_ref) == 1);
105         LASSERT(list_empty(&mfd->mfd_list));
106         OBD_FREE_PRE(mfd, sizeof(*mfd), "rcu");
107         kfree_rcu(mfd, mfd_open_handle.h_rcu);
108 }
109
110 static int mdt_create_data(struct mdt_thread_info *info,
111                            struct mdt_object *p, struct mdt_object *o)
112 {
113         struct md_op_spec *spec = &info->mti_spec;
114         struct md_attr *ma = &info->mti_attr;
115         int rc = 0;
116
117         ENTRY;
118         if (!md_should_create(spec->sp_cr_flags))
119                 RETURN(0);
120
121         ma->ma_need = MA_INODE | MA_LOV;
122         ma->ma_valid = 0;
123         mutex_lock(&o->mot_lov_mutex);
124         if (!o->mot_lov_created) {
125                 rc = mdo_create_data(info->mti_env,
126                                      p ? mdt_object_child(p) : NULL,
127                                      mdt_object_child(o), spec, ma);
128                 if (rc == 0)
129                         rc = mdt_attr_get_complex(info, o, ma);
130
131                 if (rc == 0 && ma->ma_valid & MA_LOV)
132                         o->mot_lov_created = 1;
133         }
134
135         mutex_unlock(&o->mot_lov_mutex);
136         RETURN(rc);
137 }
138
139 int mdt_write_read(struct mdt_object *o)
140 {
141         int rc = 0;
142
143         ENTRY;
144         spin_lock(&o->mot_write_lock);
145         rc = o->mot_write_count;
146         spin_unlock(&o->mot_write_lock);
147         RETURN(rc);
148 }
149
150 int mdt_write_get(struct mdt_object *o)
151 {
152         int rc = 0;
153
154         ENTRY;
155         spin_lock(&o->mot_write_lock);
156         if (o->mot_write_count < 0)
157                 rc = -ETXTBSY;
158         else
159                 o->mot_write_count++;
160         spin_unlock(&o->mot_write_lock);
161
162         RETURN(rc);
163 }
164
165 void mdt_write_put(struct mdt_object *o)
166 {
167         ENTRY;
168         spin_lock(&o->mot_write_lock);
169         o->mot_write_count--;
170         spin_unlock(&o->mot_write_lock);
171         EXIT;
172 }
173
174 static int mdt_write_deny(struct mdt_object *o)
175 {
176         int rc = 0;
177
178         ENTRY;
179         spin_lock(&o->mot_write_lock);
180         if (o->mot_write_count > 0)
181                 rc = -ETXTBSY;
182         else
183                 o->mot_write_count--;
184         spin_unlock(&o->mot_write_lock);
185         RETURN(rc);
186 }
187
188 static void mdt_write_allow(struct mdt_object *o)
189 {
190         ENTRY;
191         spin_lock(&o->mot_write_lock);
192         o->mot_write_count++;
193         spin_unlock(&o->mot_write_lock);
194         EXIT;
195 }
196
197 /* there can be no real transaction so prepare the fake one */
198 static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
199 {
200         struct mdt_device *mdt = info->mti_mdt;
201         struct ptlrpc_request *req = mdt_info_req(info);
202         struct tg_export_data *ted;
203         struct lsd_client_data *lcd;
204
205         ENTRY;
206         if (mdt_rdonly(req->rq_export))
207                 RETURN_EXIT;
208
209         /* transaction has occurred already */
210         if (lustre_msg_get_transno(req->rq_repmsg) != 0)
211                 RETURN_EXIT;
212
213         if (tgt_is_multimodrpcs_client(req->rq_export)) {
214                 struct thandle *th;
215
216                 /* generate an empty transaction to get a transno
217                  * and reply data */
218                 th = dt_trans_create(info->mti_env, mdt->mdt_bottom);
219                 if (!IS_ERR(th)) {
220                         rc = dt_trans_start(info->mti_env, mdt->mdt_bottom, th);
221                         dt_trans_stop(info->mti_env, mdt->mdt_bottom, th);
222                 }
223                 RETURN_EXIT;
224         }
225
226         spin_lock(&mdt->mdt_lut.lut_translock);
227         if (rc != 0) {
228                 if (info->mti_transno != 0) {
229                         struct obd_export *exp = req->rq_export;
230
231                         CERROR("%s: replay trans %llu NID %s: rc = %d\n",
232                                mdt_obd_name(mdt), info->mti_transno,
233                                obd_export_nid2str(exp), rc);
234                         spin_unlock(&mdt->mdt_lut.lut_translock);
235                         RETURN_EXIT;
236                 }
237         } else if (info->mti_transno == 0) {
238                 info->mti_transno = ++mdt->mdt_lut.lut_last_transno;
239         } else {
240                 /* should be replay */
241                 if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
242                         mdt->mdt_lut.lut_last_transno = info->mti_transno;
243         }
244         spin_unlock(&mdt->mdt_lut.lut_translock);
245
246         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
247                info->mti_transno,
248                req->rq_export->exp_obd->obd_last_committed);
249
250         req->rq_transno = info->mti_transno;
251         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
252
253         /* update lcd in memory only for resent cases */
254         ted = &req->rq_export->exp_target_data;
255         LASSERT(ted);
256         mutex_lock(&ted->ted_lcd_lock);
257         lcd = ted->ted_lcd;
258         if (info->mti_transno < lcd->lcd_last_transno &&
259             info->mti_transno != 0) {
260                 /* This should happen during replay. Do not update
261                  * last rcvd info if replay req transno < last transno,
262                  * otherwise the following resend(after replay) can not
263                  * be checked correctly by xid */
264                 mutex_unlock(&ted->ted_lcd_lock);
265                 CDEBUG(D_HA, "%s: transno = %llu < last_transno = %llu\n",
266                        mdt_obd_name(mdt), info->mti_transno,
267                        lcd->lcd_last_transno);
268                 RETURN_EXIT;
269         }
270
271         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
272                 if (info->mti_transno != 0)
273                         lcd->lcd_last_close_transno = info->mti_transno;
274                 lcd->lcd_last_close_xid = req->rq_xid;
275                 lcd->lcd_last_close_result = rc;
276         } else {
277                 /* VBR: save versions in last_rcvd for reconstruct. */
278                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
279
280                 if (pre_versions) {
281                         lcd->lcd_pre_versions[0] = pre_versions[0];
282                         lcd->lcd_pre_versions[1] = pre_versions[1];
283                         lcd->lcd_pre_versions[2] = pre_versions[2];
284                         lcd->lcd_pre_versions[3] = pre_versions[3];
285                 }
286                 if (info->mti_transno != 0)
287                         lcd->lcd_last_transno = info->mti_transno;
288
289                 lcd->lcd_last_xid = req->rq_xid;
290                 lcd->lcd_last_result = rc;
291                 lcd->lcd_last_data = info->mti_opdata;
292         }
293         mutex_unlock(&ted->ted_lcd_lock);
294
295         EXIT;
296 }
297
298 void mdt_mfd_set_mode(struct mdt_file_data *mfd, u64 open_flags)
299 {
300         LASSERT(mfd != NULL);
301
302         CDEBUG(D_DENTRY, DFID " Change mfd open_flags %#llo -> %#llo.\n",
303                PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_open_flags,
304                open_flags);
305
306         mfd->mfd_open_flags = open_flags;
307 }
308
309 /**
310  * prep ma_lmm/ma_lmv for md_attr from reply
311  */
312 static void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
313                                      struct mdt_object *obj,
314                                      struct md_attr *ma)
315 {
316         if (ma->ma_lmv || ma->ma_lmm) {
317                 CDEBUG(D_INFO, DFID " %s already set.\n",
318                        PFID(mdt_object_fid(obj)),
319                        ma->ma_lmv ? (ma->ma_lmm ? "ma_lmv and ma_lmm"
320                                                 : "ma_lmv")
321                                   : "ma_lmm");
322                 return;
323         }
324
325         if (S_ISDIR(obj->mot_header.loh_attr)) {
326                 ma->ma_lmv = req_capsule_server_get(info->mti_pill,
327                                                     &RMF_MDT_MD);
328                 ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
329                                                        &RMF_MDT_MD,
330                                                        RCL_SERVER);
331                 if (ma->ma_lmv_size > 0)
332                         ma->ma_need |= MA_LMV;
333         } else {
334                 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
335                                                     &RMF_MDT_MD);
336                 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
337                                                        &RMF_MDT_MD,
338                                                        RCL_SERVER);
339                 if (ma->ma_lmm_size > 0)
340                         ma->ma_need |= MA_LOV;
341         }
342 }
343
344 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
345                         struct mdt_object *o, u64 open_flags, int created,
346                         struct ldlm_reply *rep)
347 {
348         struct ptlrpc_request *req = mdt_info_req(info);
349         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
350         struct mdt_file_data *mfd;
351         struct md_attr *ma  = &info->mti_attr;
352         struct lu_attr *la  = &ma->ma_attr;
353         struct mdt_body *repbody;
354         bool isdir, isreg;
355         int rc = 0;
356
357         ENTRY;
358         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
359
360         isreg = S_ISREG(la->la_mode);
361         isdir = S_ISDIR(la->la_mode);
362         if (isreg && !(ma->ma_valid & MA_LOV) &&
363             !(open_flags & MDS_OPEN_RELEASE)) {
364                 /*
365                  * No EA, check whether it is will set regEA and dirEA since in
366                  * above attr get, these size might be zero, so reset it, to
367                  * retrieve the MD after create obj.
368                  */
369                 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
370                                                        &RMF_MDT_MD,
371                                                        RCL_SERVER);
372                 /* in replay case, p == NULL */
373                 rc = mdt_create_data(info, p, o);
374                 if (rc)
375                         RETURN(rc);
376
377                 if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
378                         mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
379         }
380
381         CDEBUG(D_INODE, "after open, ma_valid bit = %#llx lmm_size = %d\n",
382                ma->ma_valid, ma->ma_lmm_size);
383
384         if (ma->ma_valid & MA_LOV) {
385                 LASSERT(ma->ma_lmm_size != 0);
386                 repbody->mbo_eadatasize = ma->ma_lmm_size;
387                 if (isdir)
388                         repbody->mbo_valid |= OBD_MD_FLDIREA;
389                 else
390                         repbody->mbo_valid |= OBD_MD_FLEASIZE;
391         }
392
393         if (ma->ma_valid & MA_LMV) {
394                 LASSERT(ma->ma_lmv_size != 0);
395                 repbody->mbo_eadatasize = ma->ma_lmv_size;
396                 LASSERT(isdir);
397                 repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
398         }
399
400         if (open_flags & MDS_FMODE_WRITE)
401                 rc = mdt_write_get(o);
402         else if (open_flags & MDS_FMODE_EXEC)
403                 rc = mdt_write_deny(o);
404
405         if (rc)
406                 RETURN(rc);
407
408         rc = mo_open(info->mti_env, mdt_object_child(o),
409                      created ? open_flags | MDS_OPEN_CREATED : open_flags,
410                      &info->mti_spec);
411         if (rc != 0) {
412                 /* If we allow the client to chgrp (CFS_SETGRP_PERM), but the
413                  * client does not know which suppgid should be sent to the MDS,
414                  * or some other(s) changed the target file's GID after this RPC
415                  * sent to the MDS with the suppgid as the original GID, then we
416                  * give the client another chance to send the right suppgid. */
417                 if (rc == -EACCES &&
418                     allow_client_chgrp(info, lu_ucred(info->mti_env)))
419                         mdt_set_disposition(info, rep, DISP_OPEN_DENY);
420
421                 GOTO(err_out, rc);
422         }
423
424         mfd = mdt_mfd_new(med);
425         if (mfd == NULL)
426                 GOTO(err_out, rc = -ENOMEM);
427
428         /*
429          * Keep a reference on this object for this open, and is
430          * released by mdt_mfd_close().
431          */
432         mdt_object_get(info->mti_env, o);
433         mfd->mfd_object = o;
434         mfd->mfd_xid = req->rq_xid;
435
436         /*
437          * @open_flags is always not zero. At least it should be FMODE_READ,
438          * FMODE_WRITE or MDS_FMODE_EXEC.
439          */
440         LASSERT(open_flags != 0);
441
442         /* Open handling. */
443         mdt_mfd_set_mode(mfd, open_flags);
444
445         atomic_inc(&o->mot_open_count);
446         if (open_flags & MDS_OPEN_LEASE)
447                 atomic_inc(&o->mot_lease_count);
448
449         /* replay handle */
450         if (req_is_replay(req)) {
451                 struct mdt_file_data *old_mfd;
452                 /* Check wheather old cookie already exist in
453                  * the list, becasue when do recovery, client
454                  * might be disconnected from server, and
455                  * restart replay, so there maybe some orphan
456                  * mfd here, we should remove them */
457                 LASSERT(info->mti_rr.rr_open_handle != NULL);
458                 spin_lock(&med->med_open_lock);
459                 old_mfd = mdt_open_handle2mfd(med, info->mti_rr.rr_open_handle,
460                                               true);
461                 if (old_mfd != NULL) {
462                         CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
463                                "cookie = %#llx\n", mfd,
464                                PFID(mdt_object_fid(mfd->mfd_object)),
465                                info->mti_rr.rr_open_handle->cookie);
466                         class_handle_unhash(&old_mfd->mfd_open_handle);
467                         list_del_init(&old_mfd->mfd_list);
468                         spin_unlock(&med->med_open_lock);
469                         /* no attr update for that close */
470                         la->la_valid = 0;
471                         ma->ma_valid |= MA_FLAGS;
472                         ma->ma_attr_flags |= MDS_RECOV_OPEN;
473                         mdt_mfd_close(info, old_mfd);
474                         ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
475                         ma->ma_valid &= ~MA_FLAGS;
476                 } else {
477                         spin_unlock(&med->med_open_lock);
478                         CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
479                                "cookie = %#llx\n",
480                                PFID(mdt_object_fid(mfd->mfd_object)),
481                                info->mti_rr.rr_open_handle->cookie);
482                 }
483
484                 CDEBUG(D_HA, "Store old cookie %#llx in new mfd\n",
485                        info->mti_rr.rr_open_handle->cookie);
486
487                 mfd->mfd_open_handle_old = *info->mti_rr.rr_open_handle;
488         }
489
490         repbody->mbo_open_handle.cookie = mfd->mfd_open_handle.h_cookie;
491
492         if (req->rq_export->exp_disconnected) {
493                 spin_lock(&med->med_open_lock);
494                 class_handle_unhash(&mfd->mfd_open_handle);
495                 list_del_init(&mfd->mfd_list);
496                 spin_unlock(&med->med_open_lock);
497                 mdt_mfd_close(info, mfd);
498         } else {
499                 spin_lock(&med->med_open_lock);
500                 list_add_tail(&mfd->mfd_list, &med->med_open_head);
501                 spin_unlock(&med->med_open_lock);
502         }
503
504         mdt_empty_transno(info, rc);
505
506         RETURN(rc);
507
508 err_out:
509         if (open_flags & MDS_FMODE_WRITE)
510                 mdt_write_put(o);
511         else if (open_flags & MDS_FMODE_EXEC)
512                 mdt_write_allow(o);
513
514         return rc;
515 }
516
517 static int mdt_finish_open(struct mdt_thread_info *info,
518                            struct mdt_object *p, struct mdt_object *o,
519                            u64 open_flags,
520                            struct ldlm_reply *rep)
521 {
522         struct ptlrpc_request *req = mdt_info_req(info);
523         struct obd_export *exp = req->rq_export;
524         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
525         struct md_attr *ma  = &info->mti_attr;
526         struct lu_attr *la  = &ma->ma_attr;
527         struct mdt_file_data *mfd;
528         struct mdt_body *repbody;
529         int created;
530         int rc = 0;
531         int isreg, isdir, islnk;
532         struct list_head *t;
533
534         ENTRY;
535         LASSERT(ma->ma_valid & MA_INODE);
536         created = mdt_get_disposition(rep, DISP_OPEN_CREATE);
537
538         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
539
540         isreg = S_ISREG(la->la_mode);
541         isdir = S_ISDIR(la->la_mode);
542         islnk = S_ISLNK(la->la_mode);
543         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
544
545         /* compatibility check for 2.10 clients when it tries to open mirrored
546          * files. 2.10 clients don't verify overlapping components so they
547          * would read and write mirrored files just as if they were normal
548          * PFL files, which will cause the problem that sycned mirrors actually
549          * contain different data.
550          * Older clients are not a concern here because they don't even
551          * understand PFL layout. */
552         if (isreg && !exp_connect_flr(exp) && ma->ma_valid & MA_LOV &&
553             mdt_lmm_is_flr(ma->ma_lmm)) {
554                 /* LU-10286: for simplicity clients who don't understand
555                  * mirrored layout(with connect flag OBD_CONNECT2_FLR) won't
556                  * be able to open mirrored files */
557                 RETURN(-EOPNOTSUPP);
558         }
559
560         /* Overstriped files can crash older clients */
561         if (isreg && !exp_connect_overstriping(exp) &&
562             mdt_lmm_is_overstriping(ma->ma_lmm))
563                 RETURN(-EOPNOTSUPP);
564
565         /* LU-2275, simulate broken behaviour (esp. prevalent in
566          * pre-2.4 servers where a very strange reply is sent on error
567          * that looks like it was actually almost successful and a
568          * failure at the same time.) */
569         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
570                 mdt_set_disposition(info, rep, DISP_OPEN_OPEN |
571                                                DISP_LOOKUP_NEG |
572                                                DISP_LOOKUP_POS);
573
574                 if (open_flags & MDS_OPEN_LOCK)
575                         mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
576
577                 RETURN(-ENOENT);
578         }
579
580 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
581         if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
582                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
583                 if (IS_ERR(nodemap))
584                         RETURN(PTR_ERR(nodemap));
585
586                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
587                 nodemap_putref(nodemap);
588                 if (rc)
589                         RETURN(rc);
590         }
591 #endif
592
593         /*
594          * If we are following a symlink, don't open; and do not return open
595          * handle for special nodes as client required.
596          */
597         if (islnk || (!isreg && !isdir &&
598             (exp_connect_flags(req->rq_export) & OBD_CONNECT_NODEVOH))) {
599                 lustre_msg_set_transno(req->rq_repmsg, 0);
600                 RETURN(0);
601         }
602
603         /*
604          * We need to return the existing object's fid back, so it is done here,
605          * after preparing the reply.
606          */
607         if (!created && (open_flags & MDS_OPEN_EXCL) &&
608             (open_flags & MDS_OPEN_CREAT))
609                 RETURN(-EEXIST);
610
611         /* This can't be done earlier, we need to return reply body */
612         if (isdir) {
613                 if (open_flags & (MDS_OPEN_CREAT | MDS_FMODE_WRITE)) {
614                         /* We are trying to create or write an existing dir. */
615                         RETURN(-EISDIR);
616                 }
617         } else if (open_flags & MDS_OPEN_DIRECTORY)
618                 RETURN(-ENOTDIR);
619
620         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
621                                  OBD_FAIL_MDS_LDLM_REPLY_NET | OBD_FAIL_ONCE))
622                 RETURN(-EAGAIN);
623
624         mfd = NULL;
625         if (info->mti_rr.rr_flags & MRF_OPEN_RESEND) {
626                 spin_lock(&med->med_open_lock);
627                 list_for_each(t, &med->med_open_head) {
628                         mfd = list_entry(t, struct mdt_file_data, mfd_list);
629                         if (mfd->mfd_xid == req->rq_xid) {
630                                 repbody->mbo_open_handle.cookie =
631                                                 mfd->mfd_open_handle.h_cookie;
632                                 break;
633                         }
634                         mfd = NULL;
635                 }
636                 spin_unlock(&med->med_open_lock);
637
638                 if (mfd != NULL) {
639                         /* set repbody->ea_size for resent case */
640                         if (ma->ma_valid & MA_LOV) {
641                                 LASSERT(ma->ma_lmm_size != 0);
642                                 repbody->mbo_eadatasize = ma->ma_lmm_size;
643                                 if (isdir)
644                                         repbody->mbo_valid |= OBD_MD_FLDIREA;
645                                 else
646                                         repbody->mbo_valid |= OBD_MD_FLEASIZE;
647                         }
648                         mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
649                         RETURN(0);
650                 }
651                 /* if we have a real resend (not a resend afrer failover), it
652                  * means close is already happend, so lets return error
653                  */
654                 RETURN(-ESTALE);
655         }
656
657         rc = mdt_mfd_open(info, p, o, open_flags, created, rep);
658         if (!rc)
659                 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
660
661         RETURN(rc);
662 }
663
664 void mdt_reconstruct_open(struct mdt_thread_info *info,
665                           struct mdt_lock_handle *lhc)
666 {
667         struct req_capsule *pill = info->mti_pill;
668         struct ptlrpc_request *req = mdt_info_req(info);
669         struct mdt_reint_record *rr = &info->mti_rr;
670         struct md_attr *ma = &info->mti_attr;
671         struct ldlm_reply *ldlm_rep;
672         u64 opdata;
673         int rc;
674
675         ENTRY;
676         LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
677         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
678
679         ma->ma_need = MA_INODE | MA_HSM;
680         ma->ma_valid = 0;
681         opdata = mdt_req_from_lrd(req, info->mti_reply_data);
682         mdt_set_disposition(info, ldlm_rep, opdata);
683
684         CDEBUG(D_INODE, "This is reconstruct open: disp=%#llx, result=%d\n",
685                ldlm_rep->lock_policy_res1, req->rq_status);
686         if (req->rq_status)
687                 /* We did not create successfully, return error to client. */
688                 GOTO(out, rc = req->rq_status);
689
690         /* tg_reply_data is just memory only  structure, so any non zero fid
691          * means a real resend not a resend after recovery which need to be
692          * handled as regular open
693          */
694         if (likely(!fid_is_zero(&info->mti_reply_data->trd_object))) {
695                 rr->rr_fid2 = &info->mti_reply_data->trd_object;
696                 rr->rr_flags |= MRF_OPEN_RESEND;
697                 rc = mdt_open_by_fid(info, ldlm_rep, lhc);
698                 if (rc)
699                         lustre_msg_set_transno(req->rq_repmsg, 0);
700         } else {
701                 /* We did not try to create, so we are a pure open */
702                 rc = mdt_reint_open(info, lhc);
703         }
704         EXIT;
705 out:
706         req->rq_status = rc;
707         lustre_msg_set_status(req->rq_repmsg, req->rq_status);
708         LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
709 }
710
711 static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep,
712                            struct mdt_lock_handle *lhc)
713 {
714         u64 open_flags = info->mti_spec.sp_cr_flags;
715         struct mdt_reint_record *rr = &info->mti_rr;
716         struct md_attr *ma = &info->mti_attr;
717         struct mdt_object *o;
718         int rc;
719
720         ENTRY;
721         o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
722         if (IS_ERR(o))
723                 RETURN(rc = PTR_ERR(o));
724
725         rc = mdt_check_enc(info, o);
726         if (rc)
727                 GOTO(out, rc);
728
729         if (unlikely(mdt_object_remote(o))) {
730                 /* the child object was created on remote server */
731                 struct mdt_body *repbody;
732
733                 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
734                                                 DISP_LOOKUP_EXECD |
735                                                 DISP_LOOKUP_POS));
736                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
737                 repbody->mbo_fid1 = *rr->rr_fid2;
738                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
739                 rc = 0;
740         } else {
741                 if (mdt_object_exists(o)) {
742                         tgt_open_obj_set(info->mti_env, mdt_obj2dt(o));
743                         mdt_set_disposition(info, rep, (DISP_IT_EXECD |
744                                                         DISP_LOOKUP_EXECD |
745                                                         DISP_LOOKUP_POS));
746                         if ((open_flags & MDS_OPEN_EXCL) &&
747                             (open_flags & MDS_OPEN_CREAT))
748                                 mdt_set_disposition(info, rep,
749                                                     DISP_OPEN_CREATE);
750
751                         mdt_prep_ma_buf_from_rep(info, o, ma);
752                         rc = mdt_attr_get_complex(info, o, ma);
753                         if (rc)
754                                 GOTO(out, rc);
755                         rc = mdt_finish_open(info, NULL, o, open_flags, rep);
756                         if (rc)
757                                 GOTO(out, rc);
758                         mdt_pack_size2body(info, rr->rr_fid2, &lhc->mlh_reg_lh);
759                 } else {
760                         rc = -ENOENT;
761                 }
762         }
763
764 out:
765         mdt_object_put(info->mti_env, o);
766         RETURN(rc);
767 }
768
769 /* lock object for open */
770 static int mdt_object_open_lock(struct mdt_thread_info *info,
771                                 struct mdt_object *obj,
772                                 struct mdt_lock_handle *lhc,
773                                 __u64 *ibits)
774 {
775         struct md_attr *ma = &info->mti_attr;
776         __u64 open_flags = info->mti_spec.sp_cr_flags;
777         __u64 trybits = 0;
778         enum ldlm_mode lm = LCK_CR;
779         bool acq_lease = !!(open_flags & MDS_OPEN_LEASE);
780         bool try_layout = false;
781         bool create_layout = false;
782         int rc = 0;
783         __u32 dom_stripe = 0;
784         unsigned int dom_only = 0;
785         unsigned int dom_lock = 0;
786
787         ENTRY;
788         *ibits = 0;
789         mdt_lock_handle_init(lhc);
790
791         if (req_is_replay(mdt_info_req(info)))
792                 RETURN(0);
793
794         if (S_ISREG(lu_object_attr(&obj->mot_obj))) {
795                 if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
796                     md_should_create(open_flags))
797                         create_layout = true;
798                 if (exp_connect_layout(info->mti_exp) && !create_layout &&
799                     ma->ma_need & MA_LOV)
800                         try_layout = true;
801
802                 /* DoM files can take IO lock at OPEN when it makes sense,
803                  * check if file has DoM stripe and ask for lock if client
804                  * no lock on that resource yet.
805                  */
806                 if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
807                         dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm,
808                                                              &dom_only);
809                 /* If only DOM stripe is being used then we can expect IO
810                  * to it after OPEN and will return corresponding DOM ibit
811                  * using default strategy from mdt_opts.mo_dom_lock.
812                  * Otherwise trylock mode is used always and DOM ibit will
813                  * be returned optionally.
814                  */
815                 if (dom_stripe &&
816                     !mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
817                         dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN :
818                                    info->mti_mdt->mdt_opts.mo_dom_lock;
819         }
820
821         if (acq_lease) {
822                 /* lease open, acquire write mode of open sem */
823                 down_write(&obj->mot_open_sem);
824
825                 /* Lease exists and ask for new lease */
826                 if (atomic_read(&obj->mot_lease_count) > 0) {
827                         /* only exclusive open is supported, so lease
828                          * are conflicted to each other */
829                         GOTO(out, rc = -EBUSY);
830                 }
831
832                 /* Lease must be with open lock */
833                 if (!(open_flags & MDS_OPEN_LOCK)) {
834                         CERROR("%s: Request lease for file:"DFID ", but open lock "
835                                "is missed, open_flags = %#llo : rc = %d\n",
836                                mdt_obd_name(info->mti_mdt),
837                                PFID(mdt_object_fid(obj)), open_flags, -EPROTO);
838                         GOTO(out, rc = -EPROTO);
839                 }
840
841                 /* should conflict with new opens for write/execute */
842                 lm = LCK_PW;
843                 *ibits = MDS_INODELOCK_OPEN;
844
845                 /* never grant LCK_EX layout lock to client */
846                 try_layout = false;
847         } else { /* normal open */
848                 /* normal open holds read mode of open sem */
849                 down_read(&obj->mot_open_sem);
850
851                 if (open_flags & MDS_OPEN_LOCK) {
852                         if (open_flags & MDS_FMODE_WRITE)
853                                 lm = LCK_CW;
854                         else if (open_flags & MDS_FMODE_EXEC)
855                                 lm = LCK_PR;
856                         else
857                                 lm = LCK_CR;
858
859                         *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
860                 } else if (atomic_read(&obj->mot_lease_count) > 0) {
861                         if (open_flags & MDS_FMODE_WRITE)
862                                 lm = LCK_CW;
863                         else
864                                 lm = LCK_CR;
865
866                         /* revoke lease */
867                         *ibits = MDS_INODELOCK_OPEN;
868                         try_layout = false;
869
870                         lhc = &info->mti_lh[MDT_LH_LOCAL];
871                 } else if (dom_lock) {
872                         lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
873                         trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT;
874                 }
875
876                 CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
877                         PFID(mdt_object_fid(obj)),
878                         atomic_read(&obj->mot_lease_count), lm);
879         }
880
881         mdt_lock_reg_init(lhc, lm);
882
883         /* Return lookup lock to validate inode at the client side.
884          * This is pretty important otherwise MDT will return layout
885          * lock for each open.
886          * However this is a double-edged sword because changing
887          * permission will revoke a huge number of LOOKUP locks.
888          */
889         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
890                 if (!(*ibits & MDS_INODELOCK_LOOKUP))
891                         trybits |= MDS_INODELOCK_LOOKUP;
892                 trybits |= MDS_INODELOCK_LAYOUT;
893         }
894
895         if (*ibits | trybits)
896                 rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, false);
897
898         CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx/%#llx"
899                ", open_flags = %#llo, try_layout = %d : rc = %d\n",
900                mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(obj)),
901                *ibits, trybits, open_flags, try_layout, rc);
902
903         /* will change layout, revoke layout locks by enqueuing EX lock. */
904         if (rc == 0 && create_layout) {
905                 struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
906
907                 CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
908                         ", open_flags = %#llo\n",
909                         PFID(mdt_object_fid(obj)), open_flags);
910
911                 /* We cannot enqueue another lock for the same resource we
912                  * already have a lock for, due to mechanics of waiting list
913                  * iterating in ldlm, see LU-3601.
914                  * As such we'll drop the open lock we just got above here,
915                  * it's ok not to have this open lock as it's main purpose is to
916                  * flush unused cached client open handles. */
917                 if (lustre_handle_is_used(&lhc->mlh_reg_lh))
918                         mdt_object_unlock(info, obj, lhc, 1);
919
920                 LASSERT(!try_layout);
921                 mdt_lock_handle_init(ll);
922                 mdt_lock_reg_init(ll, LCK_EX);
923                 rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT);
924
925                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
926         }
927
928         /* Check if there is any other open handles after acquiring
929          * open lock. At this point, caching open handles have been revoked
930          * by open lock.
931          * XXX: Now only exclusive open is supported. Need to check the
932          * type of open for generic lease support. */
933         if (rc == 0 && acq_lease) {
934                 struct ptlrpc_request *req = mdt_info_req(info);
935                 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
936                 struct mdt_file_data *mfd;
937                 bool is_replay_or_resent;
938                 int open_count = 0;
939
940                 /* For lease: application can open a file and then apply lease,
941                  * @handle contains original open handle in that case.
942                  * In recovery, open REQ will be replayed and the lease REQ may
943                  * be resent that means the open handle is already stale, so we
944                  * need to fix it up here by finding new handle. */
945                 is_replay_or_resent = req_is_replay(req) ||
946                         lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
947
948                 /* if the request is _not_ a replay request, rr_open_handle
949                  * may be used to hold an open file handle which is issuing the
950                  * lease request, so that this openhandle doesn't count. */
951                 mfd = mdt_open_handle2mfd(med, info->mti_rr.rr_open_handle,
952                                           is_replay_or_resent);
953                 if (mfd != NULL)
954                         ++open_count;
955
956                 CDEBUG(D_INODE, "acq_lease "DFID": openers: %d, want: %d\n",
957                         PFID(mdt_object_fid(obj)),
958                         atomic_read(&obj->mot_open_count), open_count);
959
960                 if (atomic_read(&obj->mot_open_count) > open_count) {
961                         /* fail if anyone *else* has opened file for write */
962                         if (mdt_write_read(obj) > 1)
963                                 GOTO(out, rc = -EBUSY);
964                 }
965         }
966         GOTO(out, rc);
967
968 out:
969         RETURN(rc);
970 }
971
972 static void mdt_object_open_unlock(struct mdt_thread_info *info,
973                                    struct mdt_object *obj,
974                                    struct mdt_lock_handle *lhc,
975                                    __u64 ibits, int rc)
976 {
977         __u64 open_flags = info->mti_spec.sp_cr_flags;
978         struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LOCAL];
979
980         ENTRY;
981         if (req_is_replay(mdt_info_req(info)))
982                 RETURN_EXIT;
983
984         /* Release local lock - the lock put in MDT_LH_LOCAL will never
985          * return to client side. */
986         if (lustre_handle_is_used(&ll->mlh_reg_lh))
987                 mdt_object_unlock(info, obj, ll, 1);
988
989         ll = &info->mti_lh[MDT_LH_LAYOUT];
990         /* Release local layout lock, layout was created */
991         if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
992                 LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
993                 mdt_object_unlock(info, obj, ll, 1);
994         }
995
996         if (open_flags & MDS_OPEN_LEASE)
997                 up_write(&obj->mot_open_sem);
998         else
999                 up_read(&obj->mot_open_sem);
1000
1001         /* Cross-ref case, the lock should be returned to the client */
1002         if (ibits == 0 || rc == -MDT_EREMOTE_OPEN)
1003                 RETURN_EXIT;
1004
1005         if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT) &&
1006             !(ibits & MDS_INODELOCK_DOM)) {
1007                 /* for the open request, the lock will only return to client
1008                  * if open or layout lock is granted. */
1009                 rc = 1;
1010         }
1011
1012         if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1013                 struct ldlm_reply       *ldlm_rep;
1014
1015                 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
1016                 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1017                 if (lustre_handle_is_used(&lhc->mlh_reg_lh))
1018                         mdt_object_unlock(info, obj, lhc, 1);
1019         }
1020         RETURN_EXIT;
1021 }
1022
1023 /**
1024  * Check release is permitted for the current HSM flags.
1025  */
1026 static bool mdt_hsm_release_allow(const struct md_attr *ma)
1027 {
1028         if (!(ma->ma_valid & MA_HSM))
1029                 return false;
1030
1031         if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
1032                 return false;
1033
1034         if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
1035                 return false;
1036
1037         return true;
1038 }
1039
1040 static int mdt_open_by_fid_lock(struct mdt_thread_info *info,
1041                                 struct ldlm_reply *rep,
1042                                 struct mdt_lock_handle *lhc)
1043 {
1044         const struct lu_env *env = info->mti_env;
1045         struct mdt_device *mdt = info->mti_mdt;
1046         u64 open_flags = info->mti_spec.sp_cr_flags;
1047         struct mdt_reint_record *rr = &info->mti_rr;
1048         struct md_attr *ma = &info->mti_attr;
1049         struct mdt_object *parent = NULL;
1050         struct mdt_object *o;
1051         bool object_locked = false;
1052         u64 ibits = 0;
1053         int rc;
1054
1055         ENTRY;
1056         if (md_should_create(open_flags)) {
1057                 if (!lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1058                         parent = mdt_object_find(env, mdt, rr->rr_fid1);
1059                         if (IS_ERR(parent)) {
1060                                 CDEBUG(D_INODE, "Fail to find parent "DFID
1061                                        " for anonymous created %ld, try to"
1062                                        " use server-side parent.\n",
1063                                        PFID(rr->rr_fid1), PTR_ERR(parent));
1064                                 parent = NULL;
1065                         }
1066                 }
1067                 if (parent == NULL)
1068                         ma->ma_need |= MA_PFID;
1069         }
1070
1071         o = mdt_object_find(env, mdt, rr->rr_fid2);
1072         if (IS_ERR(o))
1073                 GOTO(out_parent_put, rc = PTR_ERR(o));
1074
1075         if (mdt_object_remote(o)) {
1076                 CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
1077                        mdt_obd_name(info->mti_mdt),
1078                        PFID(rr->rr_fid2));
1079                 GOTO(out, rc = -EREMOTE);
1080         } else if (!mdt_object_exists(o)) {
1081                 mdt_set_disposition(info, rep,
1082                                     DISP_IT_EXECD |
1083                                     DISP_LOOKUP_EXECD |
1084                                     DISP_LOOKUP_NEG);
1085                 GOTO(out, rc = -ENOENT);
1086         }
1087
1088         /* do not check enc for directory: always allow open */
1089         if (!S_ISDIR(lu_object_attr(&o->mot_obj))) {
1090                 rc = mdt_check_enc(info, o);
1091                 if (rc)
1092                         GOTO(out, rc);
1093         }
1094
1095         mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
1096
1097         mdt_prep_ma_buf_from_rep(info, o, ma);
1098         if (open_flags & MDS_OPEN_RELEASE)
1099                 ma->ma_need |= MA_HSM;
1100         rc = mdt_attr_get_complex(info, o, ma);
1101         if (rc)
1102                 GOTO(out, rc);
1103
1104         /* We should not change file's existing LOV EA */
1105         if (S_ISREG(lu_object_attr(&o->mot_obj)) &&
1106             open_flags & MDS_OPEN_HAS_EA && ma->ma_valid & MA_LOV)
1107                 GOTO(out, rc = -EEXIST);
1108
1109         /* If a release request, check file open flags are fine and ask for an
1110          * exclusive open access. */
1111         if (open_flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
1112                 GOTO(out, rc = -EPERM);
1113
1114         rc = mdt_check_resent_lock(info, o, lhc);
1115         if (rc < 0) {
1116                 GOTO(out, rc);
1117         } else if (rc > 0) {
1118                 rc = mdt_object_open_lock(info, o, lhc, &ibits);
1119                 object_locked = true;
1120                 if (rc)
1121                         GOTO(out_unlock, rc);
1122         }
1123
1124         if (ma->ma_valid & MA_PFID) {
1125                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
1126                 if (IS_ERR(parent)) {
1127                         CDEBUG(D_INODE, "Fail to find parent "DFID
1128                                " for anonymous created %ld, try to"
1129                                " use system default.\n",
1130                                PFID(&ma->ma_pfid), PTR_ERR(parent));
1131                         parent = NULL;
1132                 }
1133         }
1134
1135         tgt_open_obj_set(info->mti_env, mdt_obj2dt(o));
1136         rc = mdt_finish_open(info, parent, o, open_flags, rep);
1137         if (!rc) {
1138                 mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
1139                 if (open_flags & MDS_OPEN_LOCK)
1140                         mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
1141                 if (open_flags & MDS_OPEN_LEASE)
1142                         mdt_set_disposition(info, rep, DISP_OPEN_LEASE);
1143         }
1144         GOTO(out_unlock, rc);
1145
1146 out_unlock:
1147         if (object_locked)
1148                 mdt_object_open_unlock(info, o, lhc, ibits, rc);
1149 out:
1150         mdt_object_put(env, o);
1151         if (rc == 0)
1152                 mdt_pack_size2body(info, rr->rr_fid2, &lhc->mlh_reg_lh);
1153 out_parent_put:
1154         if (parent != NULL)
1155                 mdt_object_put(env, parent);
1156         return rc;
1157 }
1158
1159 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
1160 static int mdt_cross_open(struct mdt_thread_info *info,
1161                           const struct lu_fid *parent_fid,
1162                           const struct lu_fid *fid,
1163                           struct ldlm_reply *rep, u64 open_flags)
1164 {
1165         struct md_attr *ma = &info->mti_attr;
1166         struct mdt_object *o;
1167         int rc;
1168
1169         ENTRY;
1170         o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1171         if (IS_ERR(o))
1172                 RETURN(rc = PTR_ERR(o));
1173
1174         rc = mdt_check_enc(info, o);
1175         if (rc)
1176                 GOTO(out, rc);
1177
1178         if (mdt_object_remote(o)) {
1179                 /* Something is wrong here, the object is on another MDS! */
1180                 CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
1181                        mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
1182                 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
1183                                 &o->mot_obj,
1184                                 "Object isn't on this server! FLD error?");
1185                 rc = -EFAULT;
1186         } else {
1187                 if (mdt_object_exists(o)) {
1188                         int mask;
1189
1190                         /* Do permission check for cross-open after converting
1191                          * MDS_OPEN_* flags to MAY_* permission mask.
1192                          */
1193                         mask = mds_accmode(open_flags);
1194
1195                         rc = mo_permission(info->mti_env, NULL,
1196                                            mdt_object_child(o), NULL, mask);
1197                         if (rc)
1198                                 goto out;
1199
1200                         mdt_prep_ma_buf_from_rep(info, o, ma);
1201                         rc = mdt_attr_get_complex(info, o, ma);
1202                         if (rc != 0)
1203                                 GOTO(out, rc);
1204
1205                         rc = mdt_pack_secctx_in_reply(info, o);
1206                         if (unlikely(rc))
1207                                 GOTO(out, rc);
1208
1209                         rc = mdt_pack_encctx_in_reply(info, o);
1210                         if (unlikely(rc))
1211                                 GOTO(out, rc);
1212
1213                         rc = mdt_finish_open(info, NULL, o, open_flags, rep);
1214                 } else {
1215                         /*
1216                          * Something is wrong here. lookup was positive but
1217                          * there is no object!
1218                          */
1219                         CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
1220                               mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
1221                         rc = -EFAULT;
1222                 }
1223         }
1224 out:
1225         mdt_object_put(info->mti_env, o);
1226
1227         RETURN(rc);
1228 }
1229
1230 /*
1231  * find root object and take its xattr lock if it's on remote MDT, later create
1232  * may use fs default striping (which is stored in root xattr).
1233  */
1234 static int mdt_lock_root_xattr(struct mdt_thread_info *info,
1235                                struct mdt_device *mdt)
1236 {
1237         struct mdt_object *md_root = mdt->mdt_md_root;
1238         struct lustre_handle lhroot;
1239         int rc;
1240
1241         if (md_root == NULL) {
1242                 lu_root_fid(&info->mti_tmp_fid1);
1243                 md_root = mdt_object_find(info->mti_env, mdt,
1244                                           &info->mti_tmp_fid1);
1245                 if (IS_ERR(md_root))
1246                         return PTR_ERR(md_root);
1247
1248                 spin_lock(&mdt->mdt_lock);
1249                 if (mdt->mdt_md_root != NULL) {
1250                         spin_unlock(&mdt->mdt_lock);
1251
1252                         LASSERTF(mdt->mdt_md_root == md_root,
1253                                  "Different root object ("
1254                                  DFID") instances, %p, %p\n",
1255                                  PFID(&info->mti_tmp_fid1),
1256                                  mdt->mdt_md_root, md_root);
1257                         LASSERT(atomic_read(
1258                                 &md_root->mot_obj.lo_header->loh_ref) > 1);
1259
1260                         mdt_object_put(info->mti_env, md_root);
1261                 } else {
1262                         mdt->mdt_md_root = md_root;
1263                         spin_unlock(&mdt->mdt_lock);
1264                 }
1265         }
1266
1267         if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
1268                 return 0;
1269
1270         rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
1271                                     &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
1272                                     true);
1273         if (rc < 0)
1274                 return rc;
1275
1276         md_root->mot_cache_attr = 1;
1277
1278         /* don't cancel this lock, so that we know the cached xattr is valid. */
1279         ldlm_lock_decref(&lhroot, LCK_PR);
1280
1281         return 0;
1282 }
1283
1284 static inline enum ldlm_mode mdt_open_lock_mode(struct mdt_thread_info *info,
1285                                                 struct mdt_object *p,
1286                                                 struct lu_name *name,
1287                                                 u64 open_flags)
1288 {
1289         int result;
1290         struct lu_fid fid;
1291
1292         /* We don't need to take the DLM lock for a volatile */
1293         if (open_flags & MDS_OPEN_VOLATILE)
1294                 return LCK_NL;
1295
1296         if (!(open_flags & MDS_OPEN_CREAT))
1297                 return LCK_PR;
1298
1299         result = mdo_lookup(info->mti_env, mdt_object_child(p), name, &fid,
1300                             &info->mti_spec);
1301
1302         /* If the file exists we only need a read lock on the parent */
1303         return (result == 0) ? LCK_PR : LCK_PW;
1304 }
1305
1306 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
1307 {
1308         struct mdt_device *mdt = info->mti_mdt;
1309         struct ptlrpc_request *req = mdt_info_req(info);
1310         struct mdt_object *parent;
1311         struct mdt_object *child;
1312         struct mdt_lock_handle *lh = NULL;
1313         struct ldlm_reply *ldlm_rep;
1314         struct mdt_body *repbody;
1315         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1316         struct md_attr *ma = &info->mti_attr;
1317         u64 open_flags = info->mti_spec.sp_cr_flags;
1318         u64 ibits = 0;
1319         struct mdt_reint_record *rr = &info->mti_rr;
1320         int result, rc;
1321         int created = 0;
1322         int object_locked = 0;
1323         enum ldlm_mode lock_mode;
1324         u32 msg_flags;
1325         ktime_t kstart = ktime_get();
1326
1327         ENTRY;
1328         OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
1329                                (obd_timeout + 1) / 4);
1330
1331         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1332
1333         ma->ma_need = MA_INODE;
1334         ma->ma_valid = 0;
1335
1336         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
1337         ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
1338
1339         if (unlikely(open_flags & MDS_OPEN_JOIN_FILE)) {
1340                 CERROR("file join is not supported anymore.\n");
1341                 GOTO(out, result = err_serious(-EOPNOTSUPP));
1342         }
1343         msg_flags = lustre_msg_get_flags(req->rq_reqmsg);
1344
1345         if ((open_flags & (MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS)) &&
1346             info->mti_spec.u.sp_ea.eadata == NULL)
1347                 GOTO(out, result = err_serious(-EINVAL));
1348
1349         if (open_flags & MDS_FMODE_WRITE &&
1350             exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
1351                 GOTO(out, result = -EROFS);
1352
1353         CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
1354                "cr_flag=%#llo mode=0%06o msg_flag=0x%x\n",
1355                PFID(rr->rr_fid1), PNAME(&rr->rr_name), PFID(rr->rr_fid2),
1356                open_flags, ma->ma_attr.la_mode, msg_flags);
1357
1358         if (info->mti_cross_ref) {
1359                 /* This is cross-ref open */
1360                 mdt_set_disposition(info, ldlm_rep,
1361                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD |
1362                              DISP_LOOKUP_POS));
1363                 result = mdt_cross_open(info, rr->rr_fid2, rr->rr_fid1,
1364                                         ldlm_rep, open_flags);
1365                 GOTO(out, result);
1366         } else if (req_is_replay(req)) {
1367                 result = mdt_open_by_fid(info, ldlm_rep, lhc);
1368
1369                 if (result != -ENOENT)
1370                         GOTO(out, result);
1371
1372                 /* We didn't find the correct object, so we need to re-create it
1373                  * via a regular replay. */
1374                 if (!(open_flags & MDS_OPEN_CREAT)) {
1375                         DEBUG_REQ(D_ERROR, req,
1376                                   "OPEN & CREAT not in open replay/by_fid");
1377                         GOTO(out, result = -EFAULT);
1378                 }
1379                 CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
1380         } else if (open_flags & MDS_OPEN_BY_FID) {
1381                 result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
1382                 if (result < 0)
1383                         CDEBUG(D_INFO, "no object for "DFID": %d\n",
1384                                PFID(rr->rr_fid2), result);
1385                 GOTO(out, result);
1386         }
1387
1388         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
1389                 GOTO(out, result = err_serious(-ENOMEM));
1390
1391         mdt_set_disposition(info, ldlm_rep,
1392                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
1393
1394         if (!lu_name_is_valid(&rr->rr_name))
1395                 GOTO(out, result = -EPROTO);
1396
1397         result = mdt_lock_root_xattr(info, mdt);
1398         if (result < 0)
1399                 GOTO(out, result);
1400
1401         parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
1402         if (IS_ERR(parent))
1403                 GOTO(out, result = PTR_ERR(parent));
1404
1405         /* get and check version of parent */
1406         result = mdt_version_get_check(info, parent, 0);
1407         if (result) {
1408                 mdt_object_put(info->mti_env, parent);
1409                 GOTO(out, result);
1410         }
1411
1412         result = mdt_check_enc(info, parent);
1413         if (result)
1414                 GOTO(out_parent, result);
1415
1416         fid_zero(child_fid);
1417         result = -ENOENT;
1418         lock_mode = mdt_open_lock_mode(info, parent, &rr->rr_name, open_flags);
1419
1420         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1421 again_pw:
1422         if (lock_mode != LCK_NL) {
1423                 lh = &info->mti_lh[MDT_LH_PARENT];
1424                 mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
1425                 result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
1426                 if (result != 0)
1427                         GOTO(out_parent, result);
1428
1429                 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
1430                                     &rr->rr_name, child_fid, &info->mti_spec);
1431         }
1432
1433         LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
1434                  "looking for "DFID"/"DNAME", found FID = "DFID"\n",
1435                  PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
1436                  PFID(child_fid));
1437
1438         if (result != 0 && result != -ENOENT)
1439                 GOTO(out_parent_unlock, result);
1440
1441         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1442
1443         if (result == -ENOENT) {
1444                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1445                 if (!(open_flags & MDS_OPEN_CREAT))
1446                         GOTO(out_parent_unlock, result);
1447                 if (mdt_rdonly(req->rq_export))
1448                         GOTO(out_parent_unlock, result = -EROFS);
1449
1450                 LASSERT(equi(lh == NULL, lock_mode == LCK_NL));
1451
1452                 if (lock_mode == LCK_PR) {
1453                         /* unlink vs create race: get write lock and restart */
1454                         mdt_object_unlock(info, parent, lh, 1);
1455                         mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1456                         mdt_lock_handle_init(lh);
1457                         lock_mode = LCK_PW;
1458                         goto again_pw;
1459                 }
1460
1461                 *child_fid = *info->mti_rr.rr_fid2;
1462                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
1463                          PFID(child_fid));
1464                 /* In the function below, .hs_keycmp resolves to
1465                  * lu_obj_hop_keycmp() */
1466                 /* coverity[overrun-buffer-val] */
1467                 child = mdt_object_new(info->mti_env, mdt, child_fid);
1468         } else {
1469                 /*
1470                  * Check for O_EXCL is moved to the mdt_finish_open(),
1471                  * we need to return FID back in that case.
1472                  */
1473                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1474                 child = mdt_object_find(info->mti_env, mdt, child_fid);
1475         }
1476         if (IS_ERR(child))
1477                 GOTO(out_parent_unlock, result = PTR_ERR(child));
1478
1479         /** check version of child  */
1480         rc = mdt_version_get_check(info, child, 1);
1481         if (rc)
1482                 GOTO(out_child, result = rc);
1483
1484         tgt_open_obj_set(info->mti_env, mdt_obj2dt(child));
1485
1486         if (result == -ENOENT) {
1487                 /* Create under OBF and .lustre is not permitted */
1488                 if (!fid_is_md_operative(rr->rr_fid1) &&
1489                     (open_flags & MDS_OPEN_VOLATILE) == 0)
1490                         GOTO(out_child, result = -EPERM);
1491
1492                 /* save versions in reply */
1493                 mdt_version_get_save(info, parent, 0);
1494                 mdt_version_get_save(info, child, 1);
1495
1496                 /* version of child will be changed */
1497                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
1498
1499                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
1500                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1501
1502                 /* Don't do lookup sanity check. We know name doesn't exist. */
1503                 info->mti_spec.sp_cr_lookup = 0;
1504                 info->mti_spec.sp_feat = &dt_directory_features;
1505
1506                 result = mdo_create(info->mti_env, mdt_object_child(parent),
1507                                     &rr->rr_name, mdt_object_child(child),
1508                                     &info->mti_spec, &info->mti_attr);
1509                 if (result == -ERESTART) {
1510                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1511                         GOTO(out_child, result);
1512                 } else {
1513                         mdt_prep_ma_buf_from_rep(info, child, ma);
1514                         /* XXX: we should call this once, see few lines below */
1515                         if (result == 0)
1516                                 result = mdt_attr_get_complex(info, child, ma);
1517
1518                         if (result != 0)
1519                                 GOTO(out_child, result);
1520                 }
1521                 created = 1;
1522                 mdt_counter_incr(req, LPROC_MDT_MKNOD,
1523                                  ktime_us_delta(ktime_get(), kstart));
1524         } else {
1525                 /*
1526                  * The object is on remote node, return its FID for remote open.
1527                  */
1528                 if (mdt_object_remote(child)) {
1529                         /*
1530                          * Check if this lock already was sent to client and
1531                          * this is resent case. For resent case do not take lock
1532                          * again, use what is already granted.
1533                          */
1534                         LASSERT(lhc != NULL);
1535
1536                         rc = mdt_check_resent_lock(info, child, lhc);
1537                         if (rc < 0) {
1538                                 GOTO(out_child, result = rc);
1539                         } else if (rc > 0) {
1540                                 mdt_lock_handle_init(lhc);
1541                                 mdt_lock_reg_init(lhc, LCK_PR);
1542
1543                                 rc = mdt_object_lock(info, child, lhc,
1544                                                      MDS_INODELOCK_LOOKUP);
1545                         }
1546                         repbody->mbo_fid1 = *mdt_object_fid(child);
1547                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1548                         if (rc != 0)
1549                                 result = rc;
1550                         else
1551                                 result = -MDT_EREMOTE_OPEN;
1552                         GOTO(out_child, result);
1553                 } else if (mdt_object_exists(child)) {
1554                         /* Check early for MDS_OPEN_DIRECTORY/O_DIRECTORY to
1555                          * avoid opening regular files from lfs getstripe
1556                          * since doing so breaks the leases used by lfs
1557                          * mirror. See LU-13693. */
1558                         if (open_flags & MDS_OPEN_DIRECTORY &&
1559                             S_ISREG(lu_object_attr(&child->mot_obj)))
1560                                 GOTO(out_child, result = -ENOTDIR);
1561
1562                         /* We have to get attr & LOV EA & HSM for this
1563                          * object. */
1564                         mdt_prep_ma_buf_from_rep(info, child, ma);
1565                         ma->ma_need |= MA_HSM;
1566                         result = mdt_attr_get_complex(info, child, ma);
1567                         if (result != 0)
1568                                 GOTO(out_child, result);
1569                 } else {
1570                         /* Object does not exist. Likely FS corruption. */
1571                         CERROR("%s: name '"DNAME"' present, but FID "
1572                                DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
1573                                PNAME(&rr->rr_name), PFID(child_fid));
1574                         GOTO(out_child, result = -EIO);
1575                 }
1576         }
1577
1578         repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1579         repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1580
1581         rc = mdt_pack_secctx_in_reply(info, child);
1582         if (unlikely(rc))
1583                 GOTO(out_child, result = rc);
1584
1585         rc = mdt_pack_encctx_in_reply(info, child);
1586         if (unlikely(rc))
1587                 GOTO(out_child, result = rc);
1588
1589         rc = mdt_check_resent_lock(info, child, lhc);
1590         if (rc < 0) {
1591                 GOTO(out_child, result = rc);
1592         } else if (rc == 0) {
1593                 /* the open lock might already be gotten in
1594                  * ldlm_handle_enqueue() */
1595                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
1596                 if (open_flags & MDS_OPEN_LOCK)
1597                         mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1598         } else {
1599                 /* get openlock if this isn't replay and client requested it */
1600                 if (!req_is_replay(req)) {
1601                         rc = mdt_object_open_lock(info, child, lhc, &ibits);
1602                         object_locked = 1;
1603                         if (rc != 0)
1604                                 GOTO(out_child_unlock, result = rc);
1605                         else if (open_flags & MDS_OPEN_LOCK)
1606                                 mdt_set_disposition(info, ldlm_rep,
1607                                                     DISP_OPEN_LOCK);
1608                 }
1609         }
1610         /* Try to open it now. */
1611         rc = mdt_finish_open(info, parent, child, open_flags, ldlm_rep);
1612         if (rc) {
1613                 result = rc;
1614                 /* openlock will be released if mdt_finish_open() failed */
1615                 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1616
1617                 if (created && (open_flags & MDS_OPEN_VOLATILE)) {
1618                         CERROR("%s: cannot open volatile file "DFID", orphan "
1619                                "file will be left in PENDING directory until "
1620                                "next reboot, rc = %d\n", mdt_obd_name(mdt),
1621                                PFID(mdt_object_fid(child)), rc);
1622                         GOTO(out_child_unlock, result);
1623                 }
1624
1625                 if (created) {
1626                         ma->ma_need = 0;
1627                         ma->ma_valid = 0;
1628                         rc = mdo_unlink(info->mti_env,
1629                                         mdt_object_child(parent),
1630                                         mdt_object_child(child),
1631                                         &rr->rr_name,
1632                                         &info->mti_attr, 0);
1633                         if (rc != 0)
1634                                 CERROR("%s: "DFID" cleanup of open: rc = %d\n",
1635                                        mdt_obd_name(info->mti_mdt),
1636                                        PFID(mdt_object_fid(child)), rc);
1637                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1638                 }
1639         }
1640
1641         mdt_counter_incr(req, LPROC_MDT_OPEN,
1642                          ktime_us_delta(ktime_get(), kstart));
1643
1644         EXIT;
1645 out_child_unlock:
1646         if (object_locked)
1647                 mdt_object_open_unlock(info, child, lhc, ibits, result);
1648 out_child:
1649         mdt_object_put(info->mti_env, child);
1650         if (result == 0)
1651                 mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh);
1652 out_parent_unlock:
1653         if (lh != NULL)
1654                 mdt_object_unlock(info, parent, lh, result || !created);
1655
1656 out_parent:
1657         mdt_object_put(info->mti_env, parent);
1658 out:
1659         if (result)
1660                 lustre_msg_set_transno(req->rq_repmsg, 0);
1661         return result;
1662 }
1663
1664 /**
1665  * Create an orphan object use local root.
1666  */
1667 static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
1668                                           struct mdt_device *mdt,
1669                                           const struct lu_fid *fid,
1670                                           struct md_attr *attr, fmode_t fmode)
1671 {
1672         const struct lu_env *env = info->mti_env;
1673         struct md_op_spec *spec = &info->mti_spec;
1674         struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
1675         struct mdt_object *obj = NULL;
1676         struct mdt_object *local_root;
1677         static const struct lu_name lname = {
1678                 .ln_name = "i_am_nobody",
1679                 .ln_namelen = sizeof("i_am_nobody") - 1,
1680         };
1681         struct lu_ucred *uc;
1682         kernel_cap_t uc_cap_save;
1683         int rc;
1684
1685         ENTRY;
1686         rc = dt_root_get(env, mdt->mdt_bottom, local_root_fid);
1687         if (rc != 0)
1688                 RETURN(ERR_PTR(rc));
1689
1690         local_root = mdt_object_find(env, mdt, local_root_fid);
1691         if (IS_ERR(local_root))
1692                 RETURN(local_root);
1693
1694         obj = mdt_object_new(env, mdt, fid);
1695         if (IS_ERR(obj))
1696                 GOTO(out, rc = PTR_ERR(obj));
1697
1698         spec->sp_cr_lookup = 0;
1699         spec->sp_feat = &dt_directory_features;
1700         spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
1701         if (attr->ma_valid & MA_LOV) {
1702                 spec->u.sp_ea.eadata = attr->ma_lmm;
1703                 spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
1704                 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1705         } else {
1706                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
1707         }
1708
1709         uc = lu_ucred(env);
1710         uc_cap_save = uc->uc_cap;
1711         cap_raise(uc->uc_cap, CAP_DAC_OVERRIDE);
1712         rc = mdo_create(env, mdt_object_child(local_root), &lname,
1713                         mdt_object_child(obj), spec, attr);
1714         uc->uc_cap = uc_cap_save;
1715         if (rc < 0) {
1716                 CERROR("%s: cannot create volatile file "DFID": rc = %d\n",
1717                        mdt_obd_name(mdt), PFID(fid), rc);
1718                 GOTO(out, rc);
1719         }
1720
1721         rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED, spec);
1722         if (rc < 0)
1723                 CERROR("%s: cannot open volatile file "DFID", orphan "
1724                        "file will be left in PENDING directory until "
1725                        "next reboot, rc = %d\n", mdt_obd_name(mdt),
1726                        PFID(fid), rc);
1727         GOTO(out, rc);
1728
1729 out:
1730         if (rc < 0) {
1731                 if (!IS_ERR(obj))
1732                         mdt_object_put(env, obj);
1733                 obj = ERR_PTR(rc);
1734         }
1735         mdt_object_put(env, local_root);
1736         return obj;
1737 }
1738
1739 /* XXX Look into layout in MDT layer. */
1740 static inline int mdt_hsm_set_released(struct lov_mds_md *lmm)
1741 {
1742         struct lov_comp_md_v1 *comp_v1;
1743         struct lov_mds_md *v1;
1744         __u32 off;
1745         int i;
1746
1747         if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_COMP_V1_DEFINED)) {
1748                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
1749
1750                 if (comp_v1->lcm_entry_count == 0)
1751                         return -EINVAL;
1752
1753                 for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
1754                         off = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
1755                         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
1756                         v1->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
1757                 }
1758         } else {
1759                 lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
1760         }
1761         return 0;
1762 }
1763
1764 static inline int mdt_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
1765 {
1766         struct lov_comp_md_v1 *comp_v1;
1767
1768         if (le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_COMP_V1)) {
1769                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
1770                 *gen = le32_to_cpu(comp_v1->lcm_layout_gen);
1771         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1772                    le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
1773                 *gen = le16_to_cpu(lmm->lmm_layout_gen);
1774         } else {
1775                 return -EINVAL;
1776         }
1777         return 0;
1778 }
1779
1780 static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
1781                            struct md_attr *ma)
1782 {
1783         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
1784         struct lu_ucred *uc = mdt_ucred(info);
1785         struct close_data *data;
1786         struct ldlm_lock *lease;
1787         struct mdt_object *orphan;
1788         struct md_attr *orp_ma;
1789         struct lu_buf *buf;
1790         kernel_cap_t cap;
1791         bool lease_broken;
1792         int rc;
1793         int rc2;
1794
1795         ENTRY;
1796         if (mdt_rdonly(info->mti_exp))
1797                 RETURN(-EROFS);
1798
1799         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
1800         if (data == NULL)
1801                 RETURN(-EPROTO);
1802
1803         lease = ldlm_handle2lock(&data->cd_handle);
1804         if (lease == NULL)
1805                 RETURN(-ESTALE);
1806
1807         /* try to hold open_sem so that nobody else can open the file */
1808         if (!down_write_trylock(&o->mot_open_sem)) {
1809                 ldlm_lock_cancel(lease);
1810                 GOTO(out_reprocess, rc = -EBUSY);
1811         }
1812
1813         /* Check if the lease open lease has already canceled */
1814         lock_res_and_lock(lease);
1815         lease_broken = ldlm_is_cancel(lease);
1816         unlock_res_and_lock(lease);
1817
1818         LDLM_DEBUG(lease, DFID " lease broken? %d",
1819                    PFID(mdt_object_fid(o)), lease_broken);
1820
1821         /* Cancel server side lease. Client side counterpart should
1822          * have been cancelled. It's okay to cancel it now as we've
1823          * held mot_open_sem. */
1824         ldlm_lock_cancel(lease);
1825
1826         if (lease_broken) /* don't perform release task */
1827                 GOTO(out_unlock, rc = -ESTALE);
1828
1829         if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
1830                 GOTO(out_unlock, rc = -EINVAL);
1831
1832         /* ma_need was set before but it seems fine to change it in order to
1833          * avoid modifying the one from RPC */
1834         ma->ma_need = MA_HSM;
1835         rc = mdt_attr_get_complex(info, o, ma);
1836         if (rc != 0)
1837                 GOTO(out_unlock, rc);
1838
1839         if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
1840                 if (ma->ma_valid & MA_HSM) {
1841                         if (ma->ma_hsm.mh_flags & HS_RELEASED)
1842                                 GOTO(out_unlock, rc = -EALREADY);
1843
1844                         if (ma->ma_hsm.mh_arch_id != data->cd_archive_id)
1845                                 CDEBUG(D_CACHE,
1846                                        DFID" archive id diff: %llu:%u\n",
1847                                        PFID(mdt_object_fid(o)),
1848                                        ma->ma_hsm.mh_arch_id,
1849                                        data->cd_archive_id);
1850
1851                         if (!(ma->ma_hsm.mh_flags & HS_DIRTY) &&
1852                             ma->ma_hsm.mh_arch_ver == data->cd_data_version) {
1853                                 CDEBUG(D_CACHE,
1854                                        DFID" data version matches: packed=%llu "
1855                                        "and on-disk=%llu\n",
1856                                        PFID(mdt_object_fid(o)),
1857                                        data->cd_data_version,
1858                                        ma->ma_hsm.mh_arch_ver);
1859                                 ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
1860                         }
1861
1862                         if (ma->ma_hsm.mh_flags & HS_DIRTY)
1863                                 ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
1864                 } else {
1865                         /* Set up HSM attribte for PCC archived object */
1866                         BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1867                                      sizeof(info->mti_xattr_buf));
1868                         buf = &info->mti_buf;
1869                         buf->lb_buf = info->mti_xattr_buf;
1870                         buf->lb_len = sizeof(struct hsm_attrs);
1871                         memset(&ma->ma_hsm, 0, sizeof(ma->ma_hsm));
1872                         ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
1873                         ma->ma_hsm.mh_arch_id = data->cd_archive_id;
1874                         ma->ma_hsm.mh_arch_ver = data->cd_data_version;
1875                         lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
1876
1877                         rc = mo_xattr_set(info->mti_env, mdt_object_child(o),
1878                                           buf, XATTR_NAME_HSM, 0);
1879                         if (rc)
1880                                 GOTO(out_unlock, rc);
1881                 }
1882         } else {
1883                 if (!mdt_hsm_release_allow(ma))
1884                         GOTO(out_unlock, rc = -EPERM);
1885
1886                 /* already released? */
1887                 if (ma->ma_hsm.mh_flags & HS_RELEASED)
1888                         GOTO(out_unlock, rc = 0);
1889
1890                 /* Compare on-disk and packed data_version */
1891                 if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
1892                         CDEBUG(D_HSM, DFID" data_version mismatches: "
1893                                "packed=%llu and on-disk=%llu\n",
1894                                PFID(mdt_object_fid(o)),
1895                                data->cd_data_version,
1896                                ma->ma_hsm.mh_arch_ver);
1897                         GOTO(out_unlock, rc = -EPERM);
1898                 }
1899         }
1900
1901         ma->ma_valid = MA_INODE;
1902         ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_SIZE;
1903         rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
1904         if (rc < 0)
1905                 GOTO(out_unlock, rc);
1906
1907         mutex_lock(&o->mot_som_mutex);
1908         rc2 = mdt_set_som(info, o, SOM_FL_STRICT, ma->ma_attr.la_size,
1909                            ma->ma_attr.la_blocks);
1910         mutex_unlock(&o->mot_som_mutex);
1911         if (rc2 < 0)
1912                 CDEBUG(D_INODE,
1913                        "%s: File "DFID" SOM update failed: rc = %d\n",
1914                        mdt_obd_name(info->mti_mdt),
1915                        PFID(mdt_object_fid(o)), rc2);
1916
1917
1918         ma->ma_need = MA_INODE | MA_LOV;
1919         rc = mdt_attr_get_complex(info, o, ma);
1920         if (rc < 0)
1921                 GOTO(out_unlock, rc);
1922
1923         if (!(ma->ma_valid & MA_LOV)) {
1924                 /* Even empty file are released */
1925                 memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
1926                 ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEFINED);
1927                 ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
1928                 ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
1929                 ma->ma_lmm_size = sizeof(*ma->ma_lmm);
1930         } else {
1931                 /* Magic must be LOV_MAGIC_*_DEFINED or LOD will interpret
1932                  * ma_lmm as lov_user_md, then it will be confused by union of
1933                  * layout_gen and stripe_offset. */
1934                 if ((le32_to_cpu(ma->ma_lmm->lmm_magic) & LOV_MAGIC_MASK) ==
1935                     LOV_MAGIC_MAGIC)
1936                         ma->ma_lmm->lmm_magic |= cpu_to_le32(LOV_MAGIC_DEFINED);
1937                 else
1938                         GOTO(out_unlock, rc = -EINVAL);
1939         }
1940
1941         /* Set file as released. */
1942         rc = mdt_hsm_set_released(ma->ma_lmm);
1943         if (rc)
1944                 GOTO(out_unlock, rc);
1945
1946         orp_ma = &info->mti_u.hsm.attr;
1947         orp_ma->ma_attr.la_mode = S_IFREG | S_IWUSR;
1948         /* We use root ownership to bypass potential quota
1949          * restrictions on the user and group of the file. */
1950         orp_ma->ma_attr.la_uid = 0;
1951         orp_ma->ma_attr.la_gid = 0;
1952         orp_ma->ma_attr.la_valid = LA_MODE | LA_UID | LA_GID;
1953         orp_ma->ma_lmm = ma->ma_lmm;
1954         orp_ma->ma_lmm_size = ma->ma_lmm_size;
1955         orp_ma->ma_valid = MA_INODE | MA_LOV;
1956         orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
1957                                  MDS_FMODE_WRITE);
1958         if (IS_ERR(orphan)) {
1959                 CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
1960                        mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
1961                        PTR_ERR(orphan));
1962                 GOTO(out_unlock, rc = PTR_ERR(orphan));
1963         }
1964
1965         /* Set up HSM attribute for orphan object */
1966         BUILD_BUG_ON(sizeof(struct hsm_attrs) > sizeof(info->mti_xattr_buf));
1967         buf = &info->mti_buf;
1968         buf->lb_buf = info->mti_xattr_buf;
1969         buf->lb_len = sizeof(struct hsm_attrs);
1970         ma->ma_hsm.mh_flags |= HS_RELEASED;
1971         lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
1972         ma->ma_hsm.mh_flags &= ~HS_RELEASED;
1973
1974         mdt_lock_reg_init(lh, LCK_EX);
1975         rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
1976                              MDS_INODELOCK_XATTR);
1977         if (rc != 0)
1978                 GOTO(out_close, rc);
1979
1980         /* The orphan has root ownership so we need to raise
1981          * CAP_FOWNER to set the HSM attributes. */
1982         cap = uc->uc_cap;
1983         cap_raise(uc->uc_cap, CAP_FOWNER);
1984         rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
1985                           XATTR_NAME_HSM, 0);
1986         uc->uc_cap = cap;
1987         if (rc != 0)
1988                 GOTO(out_layout_lock, rc);
1989
1990         /* Swap layout with orphan objects. */
1991         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
1992                              mdt_object_child(orphan),
1993                              SWAP_LAYOUTS_MDS_HSM);
1994
1995         if (!rc && ma->ma_attr_flags & MDS_PCC_ATTACH) {
1996                 ma->ma_need = MA_LOV;
1997                 rc = mdt_attr_get_complex(info, o, ma);
1998         }
1999
2000         EXIT;
2001
2002 out_layout_lock:
2003         /* Release exclusive LL */
2004         mdt_object_unlock(info, o, lh, 1);
2005 out_close:
2006         /* Close orphan object anyway */
2007         rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
2008                        MDS_FMODE_WRITE);
2009         if (rc2 < 0)
2010                 CERROR("%s: error closing volatile file "DFID": rc = %d\n",
2011                        mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
2012         LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
2013                         "object closed");
2014         mdt_object_put(info->mti_env, orphan);
2015
2016 out_unlock:
2017         up_write(&o->mot_open_sem);
2018
2019         /* already released */
2020         if (rc == 0) {
2021                 struct mdt_body *repbody;
2022
2023                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2024                 LASSERT(repbody != NULL);
2025                 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2026                 if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
2027                         LASSERT(ma->ma_valid & MA_LOV);
2028                         rc = mdt_get_lmm_gen(ma->ma_lmm,
2029                                              &repbody->mbo_layout_gen);
2030                         if (!rc)
2031                                 repbody->mbo_valid |= OBD_MD_LAYOUT_VERSION;
2032                 }
2033         }
2034
2035 out_reprocess:
2036         ldlm_reprocess_all(lease->l_resource,
2037                            lease->l_policy_data.l_inodebits.bits);
2038         LDLM_LOCK_PUT(lease);
2039
2040         ma->ma_valid = 0;
2041         ma->ma_need = 0;
2042
2043         return rc;
2044 }
2045
2046 int mdt_close_handle_layouts(struct mdt_thread_info *info,
2047                              struct mdt_object *o, struct md_attr *ma)
2048 {
2049         struct mdt_lock_handle *lh1 = &info->mti_lh[MDT_LH_NEW];
2050         struct mdt_lock_handle *lh2 = &info->mti_lh[MDT_LH_OLD];
2051         struct close_data *data;
2052         struct ldlm_lock *lease;
2053         struct mdt_object *o1 = o, *o2 = NULL;
2054         bool lease_broken;
2055         bool swap_objects = false;
2056         int rc;
2057
2058         ENTRY;
2059         if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
2060                 RETURN(-EROFS);
2061
2062         if (!S_ISREG(lu_object_attr(&o1->mot_obj)))
2063                 RETURN(-EINVAL);
2064
2065         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2066         if (data == NULL)
2067                 RETURN(-EPROTO);
2068
2069         if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
2070                 RETURN(-EINVAL);
2071
2072         rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
2073         if (rc == 0) {
2074                 /**
2075                  * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
2076                  * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
2077                  */
2078                 if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
2079                         RETURN(-EINVAL);
2080
2081                 /* zero cd_fid to keeps o2 be NULL */
2082                 fid_zero(&data->cd_fid);
2083         } else if (rc < 0) {
2084                 /* Exchange o1 and o2, to enforce locking order */
2085                 swap_objects = true;
2086         }
2087
2088         lease = ldlm_handle2lock(&data->cd_handle);
2089         if (lease == NULL)
2090                 RETURN(-ESTALE);
2091
2092         if (!fid_is_zero(&data->cd_fid)) {
2093                 o2 = mdt_object_find(info->mti_env, info->mti_mdt,
2094                                      &data->cd_fid);
2095                 if (IS_ERR(o2))
2096                         GOTO(out_lease, rc = PTR_ERR(o2));
2097
2098                 if (!mdt_object_exists(o2))
2099                         GOTO(out_obj, rc = -ENOENT);
2100
2101                 if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
2102                         GOTO(out_obj, rc = -EINVAL);
2103
2104                 if (swap_objects)
2105                         swap(o1, o2);
2106         }
2107
2108         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
2109                            MAY_WRITE);
2110         if (rc < 0)
2111                 GOTO(out_obj, rc);
2112
2113         if (o2) {
2114                 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
2115                                    NULL, MAY_WRITE);
2116                 if (rc < 0)
2117                         GOTO(out_obj, rc);
2118         }
2119
2120         /* try to hold open_sem so that nobody else can open the file */
2121         if (!down_write_trylock(&o->mot_open_sem)) {
2122                 ldlm_lock_cancel(lease);
2123                 GOTO(out_obj, rc = -EBUSY);
2124         }
2125
2126         /* Check if the lease open lease has already canceled */
2127         lock_res_and_lock(lease);
2128         lease_broken = ldlm_is_cancel(lease);
2129         unlock_res_and_lock(lease);
2130
2131         LDLM_DEBUG(lease, DFID " lease broken? %d",
2132                    PFID(mdt_object_fid(o)), lease_broken);
2133
2134         /* Cancel server side lease. Client side counterpart should
2135          * have been cancelled. It's okay to cancel it now as we've
2136          * held mot_open_sem. */
2137         ldlm_lock_cancel(lease);
2138
2139         if (lease_broken)
2140                 GOTO(out_unlock_sem, rc = -ESTALE);
2141
2142         mdt_lock_reg_init(lh1, LCK_EX);
2143         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
2144                              MDS_INODELOCK_XATTR);
2145         if (rc < 0)
2146                 GOTO(out_unlock_sem, rc);
2147
2148         if (o2) {
2149                 mdt_lock_reg_init(lh2, LCK_EX);
2150                 rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
2151                                      MDS_INODELOCK_XATTR);
2152                 if (rc < 0)
2153                         GOTO(out_unlock1, rc);
2154         }
2155
2156         /* Swap layout with orphan object */
2157         if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
2158                 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
2159                                      mdt_object_child(o2), 0);
2160         } else if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_MERGE ||
2161                    ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
2162                 struct lu_buf *buf = &info->mti_buf;
2163                 struct md_rejig_data mrd;
2164
2165                 if (o2) {
2166                         mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
2167                 } else {
2168                         if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
2169                                 /* paranoid check again */
2170                                 CERROR(DFID
2171                                   ":only mirror split support NULL o2 object\n",
2172                                         PFID(mdt_object_fid(o)));
2173                                 GOTO(out_unlock1, rc = -EINVAL);
2174                         }
2175
2176                         /* set NULL mrd_obj for deleting mirror objects */
2177                         mrd.mrd_obj = NULL;
2178                 }
2179
2180                 if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
2181                         mrd.mrd_mirror_id = data->cd_mirror_id;
2182                         /* set a small enough blocks in the SoM */
2183                         ma->ma_attr.la_blocks >>= 1;
2184                 }
2185
2186                 buf->lb_len = sizeof(mrd);
2187                 buf->lb_buf = &mrd;
2188                 rc = mo_xattr_set(info->mti_env, mdt_object_child(o), buf,
2189                                   XATTR_LUSTRE_LOV,
2190                                   ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
2191                                   LU_XATTR_SPLIT : LU_XATTR_MERGE);
2192                 if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
2193                                                        LA_LSIZE | LA_LBLOCKS)) {
2194                         int rc2;
2195                         enum lustre_som_flags lsf;
2196
2197                         if (ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS))
2198                                 lsf = SOM_FL_STRICT;
2199                         else
2200                                 lsf = SOM_FL_LAZY;
2201
2202                         mutex_lock(&o->mot_som_mutex);
2203                         rc2 = mdt_set_som(info, o, lsf,
2204                                           ma->ma_attr.la_size,
2205                                           ma->ma_attr.la_blocks);
2206                         mutex_unlock(&o->mot_som_mutex);
2207                         if (rc2 < 0)
2208                                 CERROR(DFID": Setting i_blocks error: %d, "
2209                                        "i_blocks will be reported wrongly and "
2210                                        "can only be fixed in next resync\n",
2211                                        PFID(mdt_object_fid(o)), rc2);
2212                 }
2213         }
2214         if (rc < 0)
2215                 GOTO(out_unlock2, rc);
2216
2217         EXIT;
2218
2219 out_unlock2:
2220         /* Release exclusive LL */
2221         if (o2)
2222                 mdt_object_unlock(info, o2, lh2, 1);
2223
2224 out_unlock1:
2225         mdt_object_unlock(info, o1, lh1, 1);
2226
2227 out_unlock_sem:
2228         up_write(&o->mot_open_sem);
2229
2230         /* already swapped */
2231         if (rc == 0) {
2232                 struct mdt_body *repbody;
2233
2234                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2235                 LASSERT(repbody != NULL);
2236                 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2237         }
2238
2239 out_obj:
2240         if (o1 != o)
2241                 /* the 2nd object has been used, and swapped to o1 */
2242                 mdt_object_put(info->mti_env, o1);
2243         else if (o2)
2244                 /* the 2nd object has been used, and not swapped */
2245                 mdt_object_put(info->mti_env, o2);
2246
2247         ldlm_reprocess_all(lease->l_resource,
2248                            lease->l_policy_data.l_inodebits.bits);
2249
2250 out_lease:
2251         LDLM_LOCK_PUT(lease);
2252
2253         if (ma != NULL) {
2254                 ma->ma_valid = 0;
2255                 ma->ma_need = 0;
2256         }
2257
2258         return rc;
2259 }
2260
2261 static int mdt_close_resync_done(struct mdt_thread_info *info,
2262                                  struct mdt_object *o, struct md_attr *ma)
2263 {
2264         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LOCAL];
2265         struct close_data *data;
2266         struct ldlm_lock *lease;
2267         struct md_layout_change layout = { 0 };
2268         __u32 *resync_ids = NULL;
2269         size_t resync_count = 0;
2270         bool lease_broken;
2271         int rc;
2272
2273         ENTRY;
2274         if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
2275                 RETURN(-EROFS);
2276
2277         if (!S_ISREG(lu_object_attr(&o->mot_obj)))
2278                 RETURN(-EINVAL);
2279
2280         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2281         if (data == NULL)
2282                 RETURN(-EPROTO);
2283
2284         if (req_capsule_req_need_swab(info->mti_pill))
2285                 lustre_swab_close_data_resync_done(&data->cd_resync);
2286
2287         if (!fid_is_zero(&data->cd_fid))
2288                 RETURN(-EPROTO);
2289
2290         lease = ldlm_handle2lock(&data->cd_handle);
2291         if (lease == NULL)
2292                 RETURN(-ESTALE);
2293
2294         /* try to hold open_sem so that nobody else can open the file */
2295         if (!down_write_trylock(&o->mot_open_sem)) {
2296                 ldlm_lock_cancel(lease);
2297                 GOTO(out_reprocess, rc = -EBUSY);
2298         }
2299
2300         /* Check if the lease open lease has already canceled */
2301         lock_res_and_lock(lease);
2302         lease_broken = ldlm_is_cancel(lease);
2303         unlock_res_and_lock(lease);
2304
2305         LDLM_DEBUG(lease, DFID " lease broken? %d\n",
2306                    PFID(mdt_object_fid(o)), lease_broken);
2307
2308         /* Cancel server side lease. Client side counterpart should
2309          * have been cancelled. It's okay to cancel it now as we've
2310          * held mot_open_sem. */
2311         ldlm_lock_cancel(lease);
2312
2313         if (lease_broken) /* don't perform release task */
2314                 GOTO(out_unlock, rc = -ESTALE);
2315
2316         resync_count = data->cd_resync.resync_count;
2317
2318         if (resync_count > INLINE_RESYNC_ARRAY_SIZE) {
2319                 void *data;
2320
2321                 if (!req_capsule_has_field(info->mti_pill, &RMF_U32,
2322                                            RCL_CLIENT))
2323                         GOTO(out_unlock, rc = -EPROTO);
2324
2325                 OBD_ALLOC_PTR_ARRAY(resync_ids, resync_count);
2326                 if (!resync_ids)
2327                         GOTO(out_unlock, rc = -ENOMEM);
2328
2329                 data = req_capsule_client_get(info->mti_pill, &RMF_U32);
2330                 memcpy(resync_ids, data, resync_count * sizeof(__u32));
2331
2332                 layout.mlc_resync_ids = resync_ids;
2333         } else {
2334                 layout.mlc_resync_ids = data->cd_resync.resync_ids_inline;
2335         }
2336
2337         layout.mlc_opc = MD_LAYOUT_RESYNC_DONE;
2338         layout.mlc_resync_count = resync_count;
2339         if (ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
2340                 layout.mlc_som.lsa_valid = SOM_FL_STRICT;
2341                 layout.mlc_som.lsa_size = ma->ma_attr.la_size;
2342                 layout.mlc_som.lsa_blocks = ma->ma_attr.la_blocks;
2343         }
2344         rc = mdt_layout_change(info, o, lhc, &layout);
2345         if (rc)
2346                 GOTO(out_unlock, rc);
2347
2348         mdt_object_unlock(info, o, lhc, 0);
2349
2350         EXIT;
2351
2352 out_unlock:
2353         up_write(&o->mot_open_sem);
2354
2355         /* already released */
2356         if (rc == 0) {
2357                 struct mdt_body *repbody;
2358
2359                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2360                 LASSERT(repbody != NULL);
2361                 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2362         }
2363
2364         if (resync_ids)
2365                 OBD_FREE_PTR_ARRAY(resync_ids, resync_count);
2366
2367 out_reprocess:
2368         ldlm_reprocess_all(lease->l_resource,
2369                            lease->l_policy_data.l_inodebits.bits);
2370         LDLM_LOCK_PUT(lease);
2371
2372         ma->ma_valid = 0;
2373         ma->ma_need = 0;
2374
2375         return rc;
2376 }
2377
2378 #define MFD_CLOSED(open_flags) ((open_flags) == MDS_FMODE_CLOSED)
2379
2380 static int mdt_mfd_closed(struct mdt_file_data *mfd)
2381 {
2382         return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_open_flags));
2383 }
2384
2385 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
2386 {
2387         struct mdt_object *o = mfd->mfd_object;
2388         struct md_object *next = mdt_object_child(o);
2389         struct md_attr *ma = &info->mti_attr;
2390         struct lu_fid *ofid = &info->mti_tmp_fid1;
2391         int rc = 0;
2392         int rc2;
2393         u64 open_flags;
2394         u64 intent;
2395
2396         ENTRY;
2397         open_flags = mfd->mfd_open_flags;
2398         intent = ma->ma_attr_flags & MDS_CLOSE_INTENT;
2399         *ofid = *mdt_object_fid(o);
2400
2401         /* the below message is checked in replay-single.sh test_46 */
2402         CDEBUG(D_INODE, "%s: %sclosing file handle "DFID" with intent: %llx\n",
2403                mdt_obd_name(info->mti_mdt),
2404                ma->ma_valid & MA_FORCE_LOG ? "force " : "", PFID(ofid), intent);
2405
2406         switch (intent) {
2407         case MDS_HSM_RELEASE: {
2408                 rc = mdt_hsm_release(info, o, ma);
2409                 if (rc < 0) {
2410                         CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
2411                                mdt_obd_name(info->mti_mdt),
2412                                PFID(ofid), rc);
2413                         /* continue to close even error occurred. */
2414                 }
2415                 break;
2416         }
2417         case MDS_CLOSE_LAYOUT_MERGE:
2418         case MDS_CLOSE_LAYOUT_SPLIT:
2419         case MDS_CLOSE_LAYOUT_SWAP: {
2420                 rc = mdt_close_handle_layouts(info, o, ma);
2421                 if (rc < 0) {
2422                         CDEBUG(D_INODE,
2423                                "%s: cannot %s layout of "DFID": rc = %d\n",
2424                                mdt_obd_name(info->mti_mdt),
2425                                intent == MDS_CLOSE_LAYOUT_MERGE ? "merge" :
2426                                intent == MDS_CLOSE_LAYOUT_SPLIT ? "split" :
2427                                "swap",
2428                                PFID(ofid), rc);
2429                         /* continue to close even if error occurred. */
2430                 }
2431                 break;
2432         }
2433         case MDS_CLOSE_RESYNC_DONE:
2434                 rc = mdt_close_resync_done(info, o, ma);
2435                 if (rc < 0) {
2436                         CDEBUG(D_INODE,
2437                                "%s: cannot resync layout of "DFID": rc = %d\n",
2438                                mdt_obd_name(info->mti_mdt),
2439                                PFID(ofid), rc);
2440                         /* continue to close even if error occurred. */
2441                 }
2442                 break;
2443         default:
2444                 /* nothing */
2445                 break;
2446         }
2447
2448         if (S_ISREG(lu_object_attr(&o->mot_obj)) &&
2449             ma->ma_attr.la_valid & (LA_LSIZE | LA_LBLOCKS)) {
2450                 rc2 = mdt_lsom_update(info, o, false);
2451                 if (rc2 < 0) {
2452                         CDEBUG(D_INODE,
2453                                "%s: File " DFID " LSOM failed: rc = %d\n",
2454                                mdt_obd_name(info->mti_mdt),
2455                                PFID(ofid), rc2);
2456                         if (rc == 0)
2457                                 rc = rc2;
2458                         /* continue to close even if error occurred. */
2459                 }
2460         }
2461
2462         if (open_flags & MDS_FMODE_WRITE)
2463                 mdt_write_put(o);
2464         else if (open_flags & MDS_FMODE_EXEC)
2465                 mdt_write_allow(o);
2466
2467         /* Update atime|mtime|ctime on close. */
2468         if ((open_flags & MDS_FMODE_EXEC || open_flags & MDS_FMODE_READ ||
2469              open_flags & MDS_FMODE_WRITE) && (ma->ma_valid & MA_INODE) &&
2470             (ma->ma_attr.la_valid & LA_ATIME ||
2471              ma->ma_attr.la_valid & LA_MTIME ||
2472              ma->ma_attr.la_valid & LA_CTIME)) {
2473                 ma->ma_valid = MA_INODE;
2474                 ma->ma_attr_flags |= MDS_CLOSE_UPDATE_TIMES;
2475                 ma->ma_attr.la_valid &= (LA_ATIME | LA_MTIME | LA_CTIME);
2476
2477                 if (ma->ma_attr.la_valid & LA_MTIME) {
2478                         if (mdt_attr_get_pfid(info, o, &ma->ma_pfid) == 0)
2479                                 ma->ma_valid |= MA_PFID;
2480                 }
2481
2482                 rc2 = mo_attr_set(info->mti_env, next, ma);
2483                 if (rc2 != 0) {
2484                         CDEBUG(D_INODE,
2485                            "%s: File "DFID" set attr (%#llx) failed: rc = %d\n",
2486                                mdt_obd_name(info->mti_mdt), PFID(ofid),
2487                                ma->ma_attr.la_valid, rc2);
2488                         if (rc == 0)
2489                                 rc = rc2;
2490                 }
2491         }
2492
2493         /* If file data is modified, add the dirty flag. */
2494         if (ma->ma_attr_flags & MDS_DATA_MODIFIED) {
2495                 rc2 = mdt_add_dirty_flag(info, o, ma);
2496                 if (rc2 != 0) {
2497                         CDEBUG(D_INODE,
2498                              "%s: File "DFID" add dirty flag failed: rc = %d\n",
2499                                mdt_obd_name(info->mti_mdt), PFID(ofid), rc2);
2500                         if (rc == 0)
2501                                 rc = rc2;
2502                 }
2503         }
2504
2505         ma->ma_need |= MA_INODE;
2506         ma->ma_valid &= ~MA_INODE;
2507
2508         LASSERT(atomic_read(&o->mot_open_count) > 0);
2509         atomic_dec(&o->mot_open_count);
2510         mdt_handle_last_unlink(info, o, ma);
2511
2512         if (!MFD_CLOSED(open_flags)) {
2513                 rc2 = mo_close(info->mti_env, next, ma, open_flags);
2514                 if (rc2 != 0) {
2515                         CDEBUG(D_INODE,
2516                                "%s: File "DFID" close failed: rc = %d\n",
2517                                mdt_obd_name(info->mti_mdt), PFID(ofid), rc2);
2518                         if (rc == 0)
2519                                 rc = rc2;
2520                 }
2521                 if (mdt_dom_check_for_discard(info, o))
2522                         mdt_dom_discard_data(info, o);
2523         }
2524
2525         /* adjust open and lease count */
2526         if (open_flags & MDS_OPEN_LEASE) {
2527                 LASSERT(atomic_read(&o->mot_lease_count) > 0);
2528                 atomic_dec(&o->mot_lease_count);
2529         }
2530
2531         mdt_mfd_free(mfd);
2532         mdt_object_put(info->mti_env, o);
2533
2534         RETURN(rc);
2535 }
2536
2537 int mdt_close_internal(struct mdt_thread_info *info, struct ptlrpc_request *req,
2538                        struct mdt_body *repbody)
2539 {
2540         struct mdt_export_data *med;
2541         struct mdt_file_data   *mfd;
2542         int ret = 0;
2543         int rc = 0;
2544
2545         ENTRY;
2546         med = &req->rq_export->exp_mdt_data;
2547         spin_lock(&med->med_open_lock);
2548         mfd = mdt_open_handle2mfd(med, &info->mti_open_handle,
2549                                   req_is_replay(req));
2550         if (mdt_mfd_closed(mfd)) {
2551                 spin_unlock(&med->med_open_lock);
2552                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
2553                        ": cookie = %#llx\n", PFID(info->mti_rr.rr_fid1),
2554                        info->mti_open_handle.cookie);
2555                 /** not serious error since bug 3633 */
2556                 rc = -ESTALE;
2557         } else {
2558                 class_handle_unhash(&mfd->mfd_open_handle);
2559                 list_del_init(&mfd->mfd_list);
2560                 spin_unlock(&med->med_open_lock);
2561                 ret = mdt_mfd_close(info, mfd);
2562         }
2563
2564         RETURN(rc ? rc : ret);
2565 }
2566
2567 int mdt_close(struct tgt_session_info *tsi)
2568 {
2569         struct mdt_thread_info *info = tsi2mdt_info(tsi);
2570         struct ptlrpc_request *req = tgt_ses_req(tsi);
2571         struct md_attr *ma = &info->mti_attr;
2572         struct mdt_body *repbody = NULL;
2573         ktime_t kstart = ktime_get();
2574         int rc;
2575         int rc2;
2576
2577         ENTRY;
2578         /* Close may come with the Size-on-MDS update. Unpack it. */
2579         rc = mdt_close_unpack(info);
2580         if (rc)
2581                 GOTO(out, rc = err_serious(rc));
2582
2583         /* These fields are no longer used and are left for compatibility.
2584          * size is always zero */
2585         req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
2586                              0);
2587         req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
2588                              0);
2589         rc = req_capsule_server_pack(info->mti_pill);
2590         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
2591                 mdt_client_compatibility(info);
2592                 if (rc == 0)
2593                         mdt_fix_reply(info);
2594                 mdt_exit_ucred(info);
2595                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
2596         }
2597
2598         /* Continue to close handle even if we can not pack reply */
2599         if (rc == 0) {
2600                 repbody = req_capsule_server_get(info->mti_pill,
2601                                                  &RMF_MDT_BODY);
2602                 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
2603                                                     &RMF_MDT_MD);
2604                 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
2605                                                        &RMF_MDT_MD,
2606                                                        RCL_SERVER);
2607                 ma->ma_need = MA_INODE | MA_LOV;
2608                 repbody->mbo_eadatasize = 0;
2609                 repbody->mbo_aclsize = 0;
2610         } else {
2611                 rc = err_serious(rc);
2612         }
2613
2614         rc2 = mdt_close_internal(info, req, repbody);
2615         if (rc2 != -ESTALE)
2616                 mdt_empty_transno(info, rc2);
2617         if (rc2 != 0 && rc == 0)
2618                 rc = rc2;
2619
2620         if (repbody != NULL) {
2621                 mdt_client_compatibility(info);
2622                 rc2 = mdt_fix_reply(info);
2623                 if (rc2 != 0 && rc == 0)
2624                         rc = rc2;
2625         }
2626
2627         mdt_exit_ucred(info);
2628         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
2629                 GOTO(out, rc = err_serious(-ENOMEM));
2630
2631         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
2632                                  OBD_FAIL_MDS_CLOSE_NET_REP))
2633                 tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
2634 out:
2635         mdt_thread_info_fini(info);
2636         if (rc == 0)
2637                 mdt_counter_incr(req, LPROC_MDT_CLOSE,
2638                                  ktime_us_delta(ktime_get(), kstart));
2639         RETURN(rc);
2640 }