Whamcloud - gitweb
LU-1406 ofd: add OBD methods to handle OST requests
[fs/lustre-release.git] / lustre / ofd / ofd_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_obd.c
37  *
38  * Author: Andreas Dilger <adilger@whamcloud.com>
39  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
40  * Author: Mike Pershin <tappro@whamcloud.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FILTER
44
45 #include "ofd_internal.h"
46 #include <obd_cksum.h>
47
48 static int ofd_export_stats_init(struct ofd_device *ofd,
49                                  struct obd_export *exp, void *client_nid)
50 {
51         struct obd_device       *obd = ofd_obd(ofd);
52         struct nid_stat         *stats;
53         int                      num_stats, i;
54         int                      rc, newnid = 0;
55
56         ENTRY;
57
58         if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid))
59                 /* Self-export gets no proc entry */
60                 RETURN(0);
61
62         rc = lprocfs_exp_setup(exp, client_nid, &newnid);
63         if (rc) {
64                 /* Mask error for already created
65                  * /proc entries */
66                 if (rc == -EALREADY)
67                         rc = 0;
68                 RETURN(rc);
69         }
70
71         if (newnid == 0)
72                 RETURN(0);
73
74         stats = exp->exp_nid_stats;
75         LASSERT(stats != NULL);
76
77         OBD_ALLOC(stats->nid_brw_stats, sizeof(struct brw_stats));
78         if (stats->nid_brw_stats == NULL)
79                 GOTO(clean, rc = -ENOMEM);
80
81         for (i = 0; i < BRW_LAST; i++)
82                 cfs_spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock);
83
84         rc = lprocfs_seq_create(stats->nid_proc, "brw_stats", 0644,
85                                 &ofd_per_nid_stats_fops, stats);
86         if (rc)
87                 CWARN("Error adding the brw_stats file\n");
88
89         num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
90                      LPROC_OFD_LAST - 1;
91
92         stats->nid_stats = lprocfs_alloc_stats(num_stats,
93                                                LPROCFS_STATS_FLAG_NOPERCPU);
94         if (stats->nid_stats == NULL)
95                 return -ENOMEM;
96
97         lprocfs_init_ops_stats(LPROC_OFD_LAST, stats->nid_stats);
98         lprocfs_counter_init(stats->nid_stats, LPROC_OFD_READ_BYTES,
99                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
100         lprocfs_counter_init(stats->nid_stats, LPROC_OFD_WRITE_BYTES,
101                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
102
103         rc = lprocfs_register_stats(stats->nid_proc, "stats",
104                                     stats->nid_stats);
105         if (rc)
106                 GOTO(clean, rc);
107
108         rc = lprocfs_nid_ldlm_stats_init(stats);
109         if (rc) {
110                 lprocfs_free_stats(&stats->nid_stats);
111                 GOTO(clean, rc);
112         }
113
114         RETURN(0);
115 clean:
116         return rc;
117 }
118
119 static int ofd_parse_connect_data(const struct lu_env *env,
120                                   struct obd_export *exp,
121                                   struct obd_connect_data *data)
122 {
123         struct ofd_device                *ofd = ofd_exp(exp);
124         struct filter_export_data        *fed = &exp->exp_filter_data;
125
126         if (!data)
127                 RETURN(0);
128
129         CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
130                " ocd_version: %x ocd_grant: %d ocd_index: %u\n",
131                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
132                data->ocd_connect_flags, data->ocd_version,
133                data->ocd_grant, data->ocd_index);
134
135         if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) {
136                 CWARN("!!! This export (nid %s) used object group %d "
137                       "earlier; now it's trying to use group %d!  This could "
138                       "be a bug in the MDS. Please report to "
139                       "http://bugs.whamcloud.com/\n",
140                       obd_export_nid2str(exp), fed->fed_group,
141                       data->ocd_group);
142                 RETURN(-EPROTO);
143         }
144         fed->fed_group = data->ocd_group;
145
146         data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
147         exp->exp_connect_flags = data->ocd_connect_flags;
148         data->ocd_version = LUSTRE_VERSION_CODE;
149
150         /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */
151         if (data->ocd_connect_flags & OBD_CONNECT_MDS)
152                 CDEBUG(D_HA, "%s: Received MDS connection for group %u\n",
153                        exp->exp_obd->obd_name, data->ocd_group);
154         else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN)
155                 RETURN(-EPROTO);
156
157         if (ofd_grant_param_supp(exp)) {
158                 exp->exp_filter_data.fed_pagesize = data->ocd_blocksize;
159                 /* ocd_{blocksize,inodespace} are log2 values */
160                 data->ocd_blocksize  = ofd->ofd_blockbits;
161                 data->ocd_inodespace = ofd->ofd_dt_conf.ddp_inodespace;
162                 /* ocd_grant_extent is in 1K blocks */
163                 data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10;
164         }
165
166         if (exp->exp_connect_flags & OBD_CONNECT_GRANT)
167                 data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant);
168
169         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
170                 struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd;
171                 int                    index = lsd->lsd_ost_index;
172
173                 if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) {
174                         /* this will only happen on the first connect */
175                         lsd->lsd_ost_index = data->ocd_index;
176                         lsd->lsd_feature_compat |= OBD_COMPAT_OST;
177                         /* sync is not needed here as lut_client_add will
178                          * set exp_need_sync flag */
179                         lut_server_data_update(env, &ofd->ofd_lut, 0);
180                 } else if (index != data->ocd_index) {
181                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
182                                            " %u doesn't match actual OST index"
183                                            " %u in last_rcvd file, bad "
184                                            "configuration?\n",
185                                            obd_export_nid2str(exp), index,
186                                            data->ocd_index);
187                         RETURN(-EBADF);
188                 }
189         }
190
191         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) {
192                 data->ocd_brw_size = 65536;
193         } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
194                 data->ocd_brw_size = min(data->ocd_brw_size,
195                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
196                 if (data->ocd_brw_size == 0) {
197                         CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
198                                " ocd_version: %x ocd_grant: %d ocd_index: %u "
199                                "ocd_brw_size is unexpectedly zero, "
200                                "network data corruption?"
201                                "Refusing connection of this client\n",
202                                exp->exp_obd->obd_name,
203                                exp->exp_client_uuid.uuid,
204                                exp, data->ocd_connect_flags, data->ocd_version,
205                                data->ocd_grant, data->ocd_index);
206                         RETURN(-EPROTO);
207                 }
208         }
209
210         if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
211                 __u32 cksum_types = data->ocd_cksum_types;
212
213                 /* The client set in ocd_cksum_types the checksum types it
214                  * supports. We have to mask off the algorithms that we don't
215                  * support */
216                 data->ocd_cksum_types &= cksum_types_supported();
217
218                 if (unlikely(data->ocd_cksum_types == 0)) {
219                         CERROR("%s: Connect with checksum support but no "
220                                "ocd_cksum_types is set\n",
221                                exp->exp_obd->obd_name);
222                         RETURN(-EPROTO);
223                 }
224
225                 CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
226                        "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
227                        cksum_types, data->ocd_cksum_types);
228         } else {
229                 /* This client does not support OBD_CONNECT_CKSUM
230                  * fall back to CRC32 */
231                 CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
232                        "OBD_CONNECT_CKSUM, CRC32 will be used\n",
233                        exp->exp_obd->obd_name, obd_export_nid2str(exp));
234         }
235
236         if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES)
237                 data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes;
238
239         RETURN(0);
240 }
241
242 static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
243                              struct obd_device *obd, struct obd_uuid *cluuid,
244                              struct obd_connect_data *data, void *localdata)
245 {
246         struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
247         int                      rc;
248
249         ENTRY;
250
251         if (exp == NULL || obd == NULL || cluuid == NULL)
252                 RETURN(-EINVAL);
253
254         rc = lu_env_refill((struct lu_env *)env);
255         if (rc != 0) {
256                 CERROR("Failure to refill session: '%d'\n", rc);
257                 RETURN(rc);
258         }
259
260         ofd_info_init(env, exp);
261         rc = ofd_parse_connect_data(env, exp, data);
262         if (rc == 0)
263                 ofd_export_stats_init(ofd, exp, localdata);
264
265         RETURN(rc);
266 }
267
268 static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
269                            struct obd_device *obd, struct obd_uuid *cluuid,
270                            struct obd_connect_data *data, void *localdata)
271 {
272         struct obd_export       *exp;
273         struct ofd_device       *ofd;
274         struct lustre_handle     conn = { 0 };
275         int                      rc, group;
276
277         ENTRY;
278
279         if (_exp == NULL || obd == NULL || cluuid == NULL)
280                 RETURN(-EINVAL);
281
282         ofd = ofd_dev(obd->obd_lu_dev);
283
284         rc = class_connect(&conn, obd, cluuid);
285         if (rc)
286                 RETURN(rc);
287
288         exp = class_conn2export(&conn);
289         LASSERT(exp != NULL);
290
291         rc = lu_env_refill((struct lu_env *)env);
292         if (rc != 0) {
293                 CERROR("Failure to refill session: '%d'\n", rc);
294                 GOTO(out, rc);
295         }
296
297         ofd_info_init(env, exp);
298
299         rc = ofd_parse_connect_data(env, exp, data);
300         if (rc)
301                 GOTO(out, rc);
302
303         group = data->ocd_group;
304         if (obd->obd_replayable) {
305                 struct tg_export_data *ted = &exp->exp_target_data;
306
307                 memcpy(ted->ted_lcd->lcd_uuid, cluuid,
308                        sizeof(ted->ted_lcd->lcd_uuid));
309                 rc = lut_client_new(env, exp);
310                 if (rc != 0)
311                         GOTO(out, rc);
312                 ofd_export_stats_init(ofd, exp, localdata);
313         }
314         if (group == 0)
315                 GOTO(out, rc = 0);
316
317         /* init new group */
318         if (group > ofd->ofd_max_group) {
319                 ofd->ofd_max_group = group;
320                 rc = ofd_group_load(env, ofd, group);
321         }
322 out:
323         if (rc != 0) {
324                 class_disconnect(exp);
325                 *_exp = NULL;
326         } else {
327                 *_exp = exp;
328         }
329         RETURN(rc);
330 }
331
332 static int ofd_obd_disconnect(struct obd_export *exp)
333 {
334         struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
335         struct lu_env            env;
336         int                      rc;
337
338         ENTRY;
339
340         LASSERT(exp);
341         class_export_get(exp);
342
343         if (!(exp->exp_flags & OBD_OPT_FORCE))
344                 ofd_grant_sanity_check(ofd_obd(ofd), __FUNCTION__);
345
346         rc = server_disconnect_export(exp);
347
348         ofd_grant_discard(exp);
349
350         rc = lu_env_init(&env, LCT_DT_THREAD);
351         if (rc)
352                 RETURN(rc);
353
354         /* Do not erase record for recoverable client. */
355         if (exp->exp_obd->obd_replayable &&
356             (!exp->exp_obd->obd_fail || exp->exp_failed))
357                 lut_client_del(&env, exp);
358         lu_env_fini(&env);
359
360         class_export_put(exp);
361         RETURN(rc);
362 }
363
364 static int ofd_init_export(struct obd_export *exp)
365 {
366         int rc;
367
368         cfs_spin_lock_init(&exp->exp_filter_data.fed_lock);
369         CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
370         cfs_spin_lock(&exp->exp_lock);
371         exp->exp_connecting = 1;
372         cfs_spin_unlock(&exp->exp_lock);
373
374         /* self-export doesn't need client data and ldlm initialization */
375         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
376                                      &exp->exp_client_uuid)))
377                 return 0;
378
379         rc = lut_client_alloc(exp);
380         if (rc == 0)
381                 ldlm_init_export(exp);
382         if (rc)
383                 CERROR("%s: Can't initialize export: rc %d\n",
384                        exp->exp_obd->obd_name, rc);
385         return rc;
386 }
387
388 static int ofd_destroy_export(struct obd_export *exp)
389 {
390         struct ofd_device *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
391
392         if (exp->exp_filter_data.fed_pending)
393                 CERROR("%s: cli %s/%p has %lu pending on destroyed export"
394                        "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
395                        exp, exp->exp_filter_data.fed_pending);
396
397         target_destroy_export(exp);
398
399         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
400                                      &exp->exp_client_uuid)))
401                 return 0;
402
403         ldlm_destroy_export(exp);
404         lut_client_free(exp);
405
406         ofd_fmd_cleanup(exp);
407
408         /*
409          * discard grants once we're sure no more
410          * interaction with the client is possible
411          */
412         ofd_grant_discard(exp);
413         ofd_fmd_cleanup(exp);
414
415         if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) {
416                 if (ofd->ofd_tot_granted_clients > 0)
417                         ofd->ofd_tot_granted_clients --;
418         }
419
420         if (!(exp->exp_flags & OBD_OPT_FORCE))
421                 ofd_grant_sanity_check(exp->exp_obd, __FUNCTION__);
422
423         LASSERT(cfs_list_empty(&exp->exp_filter_data.fed_mod_list));
424         return 0;
425 }
426
427 int ofd_obd_postrecov(struct obd_device *obd)
428 {
429         struct lu_env            env;
430         struct lu_device        *ldev = obd->obd_lu_dev;
431         int                      rc;
432
433         ENTRY;
434
435         rc = lu_env_init(&env, LCT_DT_THREAD);
436         if (rc)
437                 RETURN(rc);
438         ofd_info_init(&env, obd->obd_self_export);
439
440         rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev);
441         lu_env_fini(&env);
442         RETURN(rc);
443 }
444
445 static int ofd_adapt_sptlrpc_conf(const struct lu_env *env,
446                                   struct obd_device *obd, int initial)
447 {
448         struct filter_obd       *fo = &obd->u.filter;
449         struct sptlrpc_rule_set  tmp_rset;
450         int                      rc;
451
452         sptlrpc_rule_set_init(&tmp_rset);
453         rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial);
454         if (rc) {
455                 CERROR("%s: failed get sptlrpc rules: rc = %d\n",
456                        obd->obd_name, rc);
457                 return rc;
458         }
459
460         sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
461
462         cfs_write_lock(&fo->fo_sptlrpc_lock);
463         sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset);
464         fo->fo_sptlrpc_rset = tmp_rset;
465         cfs_write_unlock(&fo->fo_sptlrpc_lock);
466
467         return 0;
468 }
469
470 static int ofd_set_mds_conn(struct obd_export *exp, void *val)
471 {
472         int rc = 0;
473
474         ENTRY;
475
476         LCONSOLE_WARN("%s: received MDS connection from %s\n",
477                       exp->exp_obd->obd_name, obd_export_nid2str(exp));
478         RETURN(rc);
479 }
480
481 static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp,
482                               __u32 keylen, void *key, __u32 vallen, void *val,
483                               struct ptlrpc_request_set *set)
484 {
485         struct ofd_device       *ofd = ofd_exp(exp);
486         int                      rc = 0;
487
488         ENTRY;
489
490         if (exp->exp_obd == NULL) {
491                 CDEBUG(D_IOCTL, "invalid export %p\n", exp);
492                 RETURN(-EINVAL);
493         }
494
495         if (KEY_IS(KEY_CAPA_KEY)) {
496                 rc = ofd_update_capa_key(ofd, val);
497                 if (rc)
498                         CERROR("%s: update capability key failed: rc = %d\n",
499                                exp->exp_obd->obd_name, rc);
500         } else if (KEY_IS(KEY_SPTLRPC_CONF)) {
501                 ofd_adapt_sptlrpc_conf(env, exp->exp_obd, 0);
502         } else if (KEY_IS(KEY_MDS_CONN)) {
503                 rc = ofd_set_mds_conn(exp, val);
504         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
505                 struct ost_body *body = val;
506
507                 /** handle grant shrink, similar to a read request */
508                 ofd_grant_prepare_read(env, exp, &body->oa);
509         } else {
510                 CERROR("%s: Unsupported key %s\n",
511                        exp->exp_obd->obd_name, (char*)key);
512                 rc = -EOPNOTSUPP;
513         }
514
515         RETURN(rc);
516 }
517
518 static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
519                         __u32 keylen, void *key, __u32 *vallen, void *val,
520                         struct lov_stripe_md *lsm)
521 {
522         struct ofd_device       *ofd = ofd_exp(exp);
523         int                      rc = 0;
524
525         ENTRY;
526
527         if (exp->exp_obd == NULL) {
528                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
529                 RETURN(-EINVAL);
530         }
531
532         if (KEY_IS(KEY_BLOCKSIZE)) {
533                 __u32 *blocksize = val;
534                 if (blocksize) {
535                         if (*vallen < sizeof(*blocksize))
536                                 RETURN(-EOVERFLOW);
537                         *blocksize = 1 << ofd->ofd_dt_conf.ddp_block_shift;
538                 }
539                 *vallen = sizeof(*blocksize);
540         } else if (KEY_IS(KEY_BLOCKSIZE_BITS)) {
541                 __u32 *blocksize_bits = val;
542                 if (blocksize_bits) {
543                         if (*vallen < sizeof(*blocksize_bits))
544                                 RETURN(-EOVERFLOW);
545                         *blocksize_bits = ofd->ofd_dt_conf.ddp_block_shift;
546                 }
547                 *vallen = sizeof(*blocksize_bits);
548         } else if (KEY_IS(KEY_LAST_ID)) {
549                 obd_id *last_id = val;
550                 if (last_id) {
551                         if (*vallen < sizeof(*last_id))
552                                 RETURN(-EOVERFLOW);
553                         *last_id = ofd_last_id(ofd,
554                                                exp->exp_filter_data.fed_group);
555                 }
556                 *vallen = sizeof(*last_id);
557         } else if (KEY_IS(KEY_FIEMAP)) {
558                 struct ofd_thread_info          *info;
559                 struct ofd_device               *ofd = ofd_exp(exp);
560                 struct ofd_object               *fo;
561                 struct ll_fiemap_info_key       *fm_key = key;
562
563                 if (val == NULL) {
564                         *vallen = fiemap_count_to_size(
565                                                fm_key->fiemap.fm_extent_count);
566                         RETURN(0);
567                 }
568
569                 info = ofd_info_init(env, exp);
570
571                 fid_ostid_unpack(&info->fti_fid, &fm_key->oa.o_oi, 0);
572
573                 CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n",
574                        PFID(&info->fti_fid));
575
576                 fo = ofd_object_find(env, ofd, &info->fti_fid);
577                 if (IS_ERR(fo)) {
578                         CERROR("%s: error finding object "DFID"\n",
579                                exp->exp_obd->obd_name, PFID(&info->fti_fid));
580                         rc = PTR_ERR(fo);
581                 } else {
582                         struct ll_user_fiemap *fiemap = val;
583
584                         ofd_read_lock(env, fo);
585                         if (ofd_object_exists(fo)) {
586                                 *fiemap = fm_key->fiemap;
587                                 rc = dt_fiemap_get(env,
588                                                    ofd_object_child(fo),
589                                                    fiemap);
590                         } else {
591                                 rc = -ENOENT;
592                         }
593                         ofd_read_unlock(env, fo);
594                         ofd_object_put(env, fo);
595                 }
596         } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
597                 *((__u32 *) val) = ofd->ofd_sync_lock_cancel;
598                 *vallen = sizeof(__u32);
599         } else {
600                 CERROR("Not supported key %s\n", (char*)key);
601                 rc = -EOPNOTSUPP;
602         }
603
604         RETURN(rc);
605 }
606
607 /** helper function for statfs, also used by grant code */
608 int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
609                         struct obd_statfs *osfs, __u64 max_age, int *from_cache)
610 {
611         int rc;
612
613         cfs_spin_lock(&ofd->ofd_osfs_lock);
614         if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) {
615                 obd_size unstable;
616
617                 /* statfs data are too old, get up-to-date one.
618                  * we must be cautious here since multiple threads might be
619                  * willing to update statfs data concurrently and we must
620                  * grant that cached statfs data are always consistent */
621
622                 if (ofd->ofd_statfs_inflight == 0)
623                         /* clear inflight counter if no users, although it would
624                          * take a while to overflow this 64-bit counter ... */
625                         ofd->ofd_osfs_inflight = 0;
626                 /* notify ofd_grant_commit() that we want to track writes
627                  * completed as of now */
628                 ofd->ofd_statfs_inflight++;
629                 /* record value of inflight counter before running statfs to
630                  * compute the diff once statfs is completed */
631                 unstable = ofd->ofd_osfs_inflight;
632                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
633
634                 /* statfs can sleep ... hopefully not for too long since we can
635                  * call it fairly often as space fills up */
636                 rc = dt_statfs(env, ofd->ofd_osd, osfs);
637                 if (unlikely(rc))
638                         return rc;
639
640                 cfs_spin_lock(&ofd->ofd_grant_lock);
641                 cfs_spin_lock(&ofd->ofd_osfs_lock);
642                 /* calculate how much space was written while we released the
643                  * ofd_osfs_lock */
644                 unstable = ofd->ofd_osfs_inflight - unstable;
645                 ofd->ofd_osfs_unstable = 0;
646                 if (unstable) {
647                         /* some writes completed while we were running statfs
648                          * w/o the ofd_osfs_lock. Those ones got added to
649                          * the cached statfs data that we are about to crunch.
650                          * Take them into account in the new statfs data */
651                         osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
652                                                unstable >> ofd->ofd_blockbits);
653                         /* However, we don't really know if those writes got
654                          * accounted in the statfs call, so tell
655                          * ofd_grant_space_left() there is some uncertainty
656                          * on the accounting of those writes.
657                          * The purpose is to prevent spurious error messages in
658                          * ofd_grant_space_left() since those writes might be
659                          * accounted twice. */
660                         ofd->ofd_osfs_unstable += unstable;
661                 }
662                 /* similarly, there is some uncertainty on write requests
663                  * between prepare & commit */
664                 ofd->ofd_osfs_unstable += ofd->ofd_tot_pending;
665                 cfs_spin_unlock(&ofd->ofd_grant_lock);
666
667                 /* finally udpate cached statfs data */
668                 ofd->ofd_osfs = *osfs;
669                 ofd->ofd_osfs_age = cfs_time_current_64();
670
671                 ofd->ofd_statfs_inflight--; /* stop tracking */
672                 if (ofd->ofd_statfs_inflight == 0)
673                         ofd->ofd_osfs_inflight = 0;
674                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
675
676                 if (from_cache)
677                         *from_cache = 0;
678         } else {
679                 /* use cached statfs data */
680                 *osfs = ofd->ofd_osfs;
681                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
682                 if (from_cache)
683                         *from_cache = 1;
684         }
685         return 0;
686 }
687
688 static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
689                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
690 {
691         struct obd_device       *obd = class_exp2obd(exp);
692         struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
693         int                      rc;
694
695         ENTRY;
696
697         rc = ofd_statfs_internal(env, ofd, osfs, max_age, NULL);
698         if (unlikely(rc))
699                 GOTO(out, rc);
700
701         /* at least try to account for cached pages.  its still racy and
702          * might be under-reporting if clients haven't announced their
703          * caches with brw recently */
704
705         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
706                " pending "LPU64" free "LPU64" avail "LPU64"\n",
707                ofd->ofd_tot_dirty, ofd->ofd_tot_granted, ofd->ofd_tot_pending,
708                osfs->os_bfree << ofd->ofd_blockbits,
709                osfs->os_bavail << ofd->ofd_blockbits);
710
711         osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
712                                  ((ofd->ofd_tot_dirty + ofd->ofd_tot_pending +
713                                    osfs->os_bsize - 1) >> ofd->ofd_blockbits));
714
715         /* The QoS code on the MDS does not care about space reserved for
716          * precreate, so take it out. */
717         if (exp->exp_connect_flags & OBD_CONNECT_MDS) {
718                 struct filter_export_data *fed;
719
720                 fed = &obd->obd_self_export->exp_filter_data;
721                 osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
722                                          fed->fed_grant >> ofd->ofd_blockbits);
723         }
724
725         ofd_grant_sanity_check(obd, __FUNCTION__);
726         CDEBUG(D_CACHE, LPU64" blocks: "LPU64" free, "LPU64" avail; "
727                LPU64" objects: "LPU64" free; state %x\n",
728                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
729                osfs->os_files, osfs->os_ffree, osfs->os_state);
730
731         if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC,
732                                  ofd->ofd_lut.lut_lsd.lsd_ost_index))
733                 osfs->os_bfree = osfs->os_bavail = 2;
734
735         if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO,
736                                  ofd->ofd_lut.lut_lsd.lsd_ost_index))
737                 osfs->os_ffree = 0;
738
739         /* OS_STATE_READONLY can be set by OSD already */
740         if (ofd->ofd_raid_degraded)
741                 osfs->os_state |= OS_STATE_DEGRADED;
742
743         if (obd->obd_self_export != exp && ofd_grant_compat(exp, ofd)) {
744                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
745                  * should not see a block size > page size, otherwise
746                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
747                  * block size which is the biggest block size known to work
748                  * with all client's page size. */
749                 osfs->os_blocks <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
750                 osfs->os_bfree  <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
751                 osfs->os_bavail <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
752                 osfs->os_bsize    = 1 << COMPAT_BSIZE_SHIFT;
753         }
754
755         EXIT;
756 out:
757         return rc;
758 }
759
760 int ofd_setattr(const struct lu_env *env, struct obd_export *exp,
761                 struct obd_info *oinfo, struct obd_trans_info *oti)
762 {
763         struct ofd_thread_info  *info;
764         struct ofd_device       *ofd = ofd_exp(exp);
765         struct ldlm_namespace   *ns = ofd->ofd_namespace;
766         struct ldlm_resource    *res;
767         struct ofd_object       *fo;
768         struct obdo             *oa = oinfo->oi_oa;
769         struct filter_fid       *ff = NULL;
770         int                      rc = 0;
771
772         ENTRY;
773
774         info = ofd_info_init(env, exp);
775         ofd_oti2info(info, oti);
776
777         fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0);
778         ofd_build_resid(&info->fti_fid, &info->fti_resid);
779
780         rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq,
781                            oinfo_capa(oinfo), CAPA_OPC_META_WRITE);
782         if (rc)
783                 GOTO(out, rc);
784
785         /* This would be very bad - accidentally truncating a file when
786          * changing the time or similar - bug 12203. */
787         if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE &&
788             oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) {
789                 static char mdsinum[48];
790
791                 if (oinfo->oi_oa->o_valid & OBD_MD_FLFID)
792                         snprintf(mdsinum, sizeof(mdsinum) - 1,
793                                  "of parent "DFID, oinfo->oi_oa->o_parent_seq,
794                                  oinfo->oi_oa->o_parent_oid, 0);
795                 else
796                         mdsinum[0] = '\0';
797
798                 CERROR("%s: setattr from %s trying to truncate object "DFID
799                        " %s\n", exp->exp_obd->obd_name,
800                        obd_export_nid2str(exp), PFID(&info->fti_fid), mdsinum);
801                 GOTO(out, rc = -EPERM);
802         }
803
804         fo = ofd_object_find(env, ofd, &info->fti_fid);
805         if (IS_ERR(fo)) {
806                 CERROR("%s: can't find object "DFID"\n",
807                        exp->exp_obd->obd_name, PFID(&info->fti_fid));
808                 GOTO(out, rc = PTR_ERR(fo));
809         }
810
811         la_from_obdo(&info->fti_attr, oinfo->oi_oa, oinfo->oi_oa->o_valid);
812         info->fti_attr.la_valid &= ~LA_TYPE;
813
814         if (oa->o_valid & OBD_MD_FLFID) {
815                 ff = &info->fti_mds_fid;
816                 ofd_prepare_fidea(ff, oa);
817         }
818
819         /* setting objects attributes (including owner/group) */
820         rc = ofd_attr_set(env, fo, &info->fti_attr, ff);
821         if (rc)
822                 GOTO(out_unlock, rc);
823
824         res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0);
825         if (res != NULL) {
826                 ldlm_res_lvbo_update(res, NULL, 0);
827                 ldlm_resource_putref(res);
828         }
829
830         oinfo->oi_oa->o_valid = OBD_MD_FLID;
831
832         /* Quota release needs uid/gid info */
833         rc = ofd_attr_get(env, fo, &info->fti_attr);
834         obdo_from_la(oinfo->oi_oa, &info->fti_attr,
835                      OFD_VALID_FLAGS | LA_UID | LA_GID);
836         ofd_info2oti(info, oti);
837 out_unlock:
838         ofd_object_put(env, fo);
839 out:
840         RETURN(rc);
841 }
842
843 static int ofd_punch(const struct lu_env *env, struct obd_export *exp,
844                      struct obd_info *oinfo, struct obd_trans_info *oti,
845                      struct ptlrpc_request_set *rqset)
846 {
847         struct ofd_thread_info  *info;
848         struct ofd_device       *ofd = ofd_exp(exp);
849         struct ldlm_namespace   *ns = ofd->ofd_namespace;
850         struct ldlm_resource    *res;
851         struct ofd_object       *fo;
852         struct filter_fid       *ff = NULL;
853         int                      rc = 0;
854
855         ENTRY;
856
857         info = ofd_info_init(env, exp);
858         ofd_oti2info(info, oti);
859
860         fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0);
861         ofd_build_resid(&info->fti_fid, &info->fti_resid);
862
863         CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64
864                ", start = "LPD64", end = "LPD64"\n", PFID(&info->fti_fid),
865                oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start,
866                oinfo->oi_policy.l_extent.end);
867
868         rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq,
869                            oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC);
870         if (rc)
871                 GOTO(out_env, rc);
872
873         fo = ofd_object_find(env, ofd, &info->fti_fid);
874         if (IS_ERR(fo)) {
875                 CERROR("%s: error finding object "DFID": rc = %ld\n",
876                        exp->exp_obd->obd_name, PFID(&info->fti_fid),
877                        PTR_ERR(fo));
878                 GOTO(out_env, rc = PTR_ERR(fo));
879         }
880
881         LASSERT(oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF);
882         if (oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF) {
883                 /* Truncate case */
884                 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
885         } else if (oinfo->oi_policy.l_extent.end >= oinfo->oi_oa->o_size) {
886                 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.end;
887         }
888
889         la_from_obdo(&info->fti_attr, oinfo->oi_oa,
890                      OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
891         info->fti_attr.la_valid &= ~LA_TYPE;
892         info->fti_attr.la_size = oinfo->oi_policy.l_extent.start;
893         info->fti_attr.la_valid |= LA_SIZE;
894
895         if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) {
896                 ff = &info->fti_mds_fid;
897                 ofd_prepare_fidea(ff, oinfo->oi_oa);
898         }
899
900         rc = ofd_object_punch(env, fo, oinfo->oi_policy.l_extent.start,
901                               oinfo->oi_policy.l_extent.end, &info->fti_attr,
902                               ff);
903         if (rc)
904                 GOTO(out, rc);
905
906         res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0);
907         if (res != NULL) {
908                 ldlm_res_lvbo_update(res, NULL, 0);
909                 ldlm_resource_putref(res);
910         }
911
912         oinfo->oi_oa->o_valid = OBD_MD_FLID;
913         /* Quota release needs uid/gid info */
914         rc = ofd_attr_get(env, fo, &info->fti_attr);
915         obdo_from_la(oinfo->oi_oa, &info->fti_attr,
916                      OFD_VALID_FLAGS | LA_UID | LA_GID);
917         ofd_info2oti(info, oti);
918 out:
919         ofd_object_put(env, fo);
920 out_env:
921         RETURN(rc);
922 }
923
924 static int ofd_destroy_by_fid(const struct lu_env *env,
925                               struct ofd_device *ofd,
926                               const struct lu_fid *fid, int orphan)
927 {
928         struct ofd_thread_info  *info = ofd_info(env);
929         struct lustre_handle     lockh;
930         int                      flags = LDLM_AST_DISCARD_DATA, rc = 0;
931         ldlm_policy_data_t       policy = {
932                                         .l_extent = { 0, OBD_OBJECT_EOF }
933                                  };
934         struct ofd_object       *fo;
935
936         ENTRY;
937
938         /* Tell the clients that the object is gone now and that they should
939          * throw away any cached pages. */
940         ofd_build_resid(fid, &info->fti_resid);
941         rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid,
942                                     LDLM_EXTENT, &policy, LCK_PW, &flags,
943                                     ldlm_blocking_ast, ldlm_completion_ast,
944                                     NULL, NULL, 0, NULL, &lockh);
945
946         /* We only care about the side-effects, just drop the lock. */
947         if (rc == ELDLM_OK)
948                 ldlm_lock_decref(&lockh, LCK_PW);
949
950         fo = ofd_object_find(env, ofd, fid);
951         if (IS_ERR(fo))
952                 RETURN(PTR_ERR(fo));
953         LASSERT(fo != NULL);
954
955         rc = ofd_object_destroy(env, fo, orphan);
956
957         ofd_object_put(env, fo);
958         RETURN(rc);
959 }
960
961 int ofd_destroy(const struct lu_env *env, struct obd_export *exp,
962                 struct obdo *oa, struct lov_stripe_md *md,
963                 struct obd_trans_info *oti, struct obd_export *md_exp,
964                 void *capa)
965 {
966         struct ofd_device       *ofd = ofd_exp(exp);
967         struct ofd_thread_info  *info;
968         obd_count                count;
969         int                      rc = 0;
970
971         ENTRY;
972
973         info = ofd_info_init(env, exp);
974         ofd_oti2info(info, oti);
975
976         if (!(oa->o_valid & OBD_MD_FLGROUP))
977                 oa->o_seq = 0;
978
979         /* check that o_misc makes sense */
980         if (oa->o_valid & OBD_MD_FLOBJCOUNT)
981                 count = oa->o_misc;
982         else
983                 count = 1; /* default case - single destroy */
984
985         /**
986          * There can be sequence of objects to destroy. Therefore this request
987          * may have multiple transaction involved in. It is OK, we need only
988          * the highest used transno to be reported back in reply but not for
989          * replays, they must report their transno
990          */
991         if (info->fti_transno == 0) /* not replay */
992                 info->fti_mult_trans = 1;
993         while (count > 0) {
994                 int lrc;
995
996                 fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0);
997                 lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0);
998                 if (lrc == -ENOENT) {
999                         CDEBUG(D_INODE,
1000                                "destroying non-existent object "LPU64"\n",
1001                                oa->o_id);
1002                         /* rewrite rc with -ENOENT only if it is 0 */
1003                         if (rc == 0)
1004                                 rc = lrc;
1005                 } else if (lrc != 0) {
1006                         CEMERG("error destroying object "LPU64": %d\n",
1007                                oa->o_id, rc);
1008                         rc = lrc;
1009                 }
1010                 count--;
1011                 oa->o_id++;
1012         }
1013
1014         /* if we have transaction then there were some deletions, we don't
1015          * need to return ENOENT in that case because it will not wait
1016          * for commit of these deletions. The ENOENT must be returned only
1017          * if there were no transations.
1018          */
1019         if (rc == -ENOENT) {
1020                 if (info->fti_transno != 0)
1021                         rc = 0;
1022         } else if (rc != 0) {
1023                 /*
1024                  * If we have at least one transaction then llog record
1025                  * on server will be removed upon commit, so for rc != 0
1026                  * we return no transno and llog record will be reprocessed.
1027                  */
1028                 info->fti_transno = 0;
1029         }
1030         ofd_info2oti(info, oti);
1031         RETURN(rc);
1032 }
1033
1034 static int ofd_orphans_destroy(const struct lu_env *env,
1035                                struct obd_export *exp, struct ofd_device *ofd,
1036                                struct obdo *oa)
1037 {
1038         struct ofd_thread_info  *info = ofd_info(env);
1039         obd_id                   last;
1040         int                      skip_orphan;
1041         int                      rc = 0;
1042         struct ost_id            oi = oa->o_oi;
1043
1044         ENTRY;
1045
1046         LASSERT(exp != NULL);
1047         skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN);
1048
1049         last = ofd_last_id(ofd, oa->o_seq);
1050         CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
1051               ofd_obd(ofd)->obd_name, oa->o_id + 1, last);
1052
1053         for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) {
1054                 fid_ostid_unpack(&info->fti_fid, &oi, 0);
1055                 rc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 1);
1056                 if (rc && rc != -ENOENT) /* this is pretty fatal... */
1057                         CEMERG("error destroying precreated id "LPU64": %d\n",
1058                                oi.oi_id, rc);
1059                 if (!skip_orphan) {
1060                         ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq);
1061                         /* update last_id on disk periodically so that if we
1062                          * restart * we don't need to re-scan all of the just
1063                          * deleted objects. */
1064                         if ((oi.oi_id & 511) == 0)
1065                                 ofd_last_id_write(env, ofd, oa->o_seq);
1066                 }
1067         }
1068         CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
1069                ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id);
1070         if (!skip_orphan) {
1071                 rc = ofd_last_id_write(env, ofd, oa->o_seq);
1072         } else {
1073                 /* don't reuse orphan object, return last used objid */
1074                 oa->o_id = last;
1075                 rc = 0;
1076         }
1077         RETURN(rc);
1078 }
1079
1080 int ofd_create(const struct lu_env *env, struct obd_export *exp,
1081                struct obdo *oa, struct lov_stripe_md **ea,
1082                struct obd_trans_info *oti)
1083 {
1084         struct ofd_device       *ofd = ofd_exp(exp);
1085         struct ofd_thread_info  *info;
1086         int                      rc = 0, diff;
1087
1088         ENTRY;
1089
1090         info = ofd_info_init(env, exp);
1091         ofd_oti2info(info, oti);
1092
1093         LASSERT(oa->o_seq >= FID_SEQ_OST_MDT0);
1094         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1095
1096         CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n",
1097                oa->o_seq, oa->o_id);
1098
1099         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1100             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1101                 if (!ofd_obd(ofd)->obd_recovering ||
1102                     oa->o_id > ofd_last_id(ofd, oa->o_seq)) {
1103                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
1104                                         oa->o_id, ofd_last_id(ofd, oa->o_seq));
1105                         GOTO(out, rc = -EINVAL);
1106                 }
1107                 /* do nothing because we create objects during first write */
1108                 GOTO(out, rc = 0);
1109         }
1110         /* former ofd_handle_precreate */
1111         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1112             (oa->o_flags & OBD_FL_DELORPHAN)) {
1113                 /* destroy orphans */
1114                 if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
1115                         CERROR("%s: dropping old orphan cleanup request\n",
1116                                ofd_obd(ofd)->obd_name);
1117                         GOTO(out, rc = 0);
1118                 }
1119                 /* This causes inflight precreates to abort and drop lock */
1120                 cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
1121                 cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]);
1122                 if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) {
1123                         CERROR("%s:["LPU64"] destroys_in_progress already cleared\n",
1124                                exp->exp_obd->obd_name, oa->o_seq);
1125                         GOTO(out, rc = 0);
1126                 }
1127                 diff = oa->o_id - ofd_last_id(ofd, oa->o_seq);
1128                 CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
1129                        ofd_last_id(ofd, oa->o_seq), diff);
1130                 if (-diff > OST_MAX_PRECREATE) {
1131                         /* FIXME: should reset precreate_next_id on MDS */
1132                         rc = 0;
1133                 } else if (diff < 0) {
1134                         rc = ofd_orphans_destroy(env, exp, ofd, oa);
1135                         cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
1136                 } else {
1137                         /* XXX: Used by MDS for the first time! */
1138                         cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
1139                 }
1140         } else {
1141                 cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]);
1142                 if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
1143                         CERROR("%s: dropping old precreate request\n",
1144                                ofd_obd(ofd)->obd_name);
1145                         GOTO(out, rc = 0);
1146                 }
1147                 /* only precreate if group == 0 and o_id is specfied */
1148                 if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) {
1149                         diff = 1; /* shouldn't we create this right now? */
1150                 } else {
1151                         diff = oa->o_id - ofd_last_id(ofd, oa->o_seq);
1152                 }
1153         }
1154         if (diff > 0) {
1155                 obd_id next_id = ofd_last_id(ofd, oa->o_seq) + 1;
1156                 int i;
1157
1158                 if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
1159                     !(oa->o_flags & OBD_FL_DELORPHAN)) {
1160                         /* don't enforce grant during orphan recovery */
1161                         rc = ofd_grant_create(env,
1162                                               ofd_obd(ofd)->obd_self_export,
1163                                               &diff);
1164                         if (rc) {
1165                                 CDEBUG(D_HA, "%s: failed to acquire grant space"
1166                                        "for precreate (%d)\n",
1167                                        ofd_obd(ofd)->obd_name, diff);
1168                                 diff = 0;
1169                         }
1170                 }
1171
1172                 CDEBUG(D_HA,
1173                        "%s: reserve %d objects in group "LPU64" at "LPU64"\n",
1174                        ofd_obd(ofd)->obd_name, diff, oa->o_seq, next_id);
1175                 for (i = 0; i < diff; i++) {
1176                         rc = ofd_precreate_object(env, ofd, next_id + i,
1177                                                   oa->o_seq);
1178                         if (rc)
1179                                 break;
1180                 }
1181                 if (i > 0) {
1182                         /* some objects got created, we can return
1183                          * them, even if last creation failed */
1184                         oa->o_id = ofd_last_id(ofd, oa->o_seq);
1185                         rc = 0;
1186                 } else {
1187                         CERROR("unable to precreate: %d\n", rc);
1188                         oa->o_id = ofd_last_id(ofd, oa->o_seq);
1189                 }
1190
1191                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
1192
1193                 if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
1194                     !(oa->o_flags & OBD_FL_DELORPHAN))
1195                         ofd_grant_commit(env, ofd_obd(ofd)->obd_self_export,
1196                                          rc);
1197         }
1198
1199         ofd_info2oti(info, oti);
1200 out:
1201         cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]);
1202         if (rc == 0 && ea != NULL) {
1203                 struct lov_stripe_md *lsm = *ea;
1204
1205                 lsm->lsm_object_id = oa->o_id;
1206         }
1207         return rc;
1208 }
1209
1210 int ofd_getattr(const struct lu_env *env, struct obd_export *exp,
1211                 struct obd_info *oinfo)
1212 {
1213         struct ofd_device       *ofd = ofd_exp(exp);
1214         struct ofd_thread_info  *info;
1215         struct ofd_object       *fo;
1216         __u64                    curr_version;
1217         int                      rc = 0;
1218
1219         ENTRY;
1220
1221         info = ofd_info_init(env, exp);
1222
1223         fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0);
1224         rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq,
1225                            oinfo_capa(oinfo), CAPA_OPC_META_READ);
1226         if (rc)
1227                 GOTO(out, rc);
1228
1229         fo = ofd_object_find(env, ofd, &info->fti_fid);
1230         if (IS_ERR(fo))
1231                 GOTO(out, rc = PTR_ERR(fo));
1232         LASSERT(fo != NULL);
1233         rc = ofd_attr_get(env, fo, &info->fti_attr);
1234         oinfo->oi_oa->o_valid = OBD_MD_FLID;
1235         if (rc == 0)
1236                 obdo_from_la(oinfo->oi_oa, &info->fti_attr,
1237                              OFD_VALID_FLAGS | LA_UID | LA_GID);
1238
1239         /* Store object version in reply */
1240         curr_version = dt_version_get(env, ofd_object_child(fo));
1241         if ((__s64)curr_version != -EOPNOTSUPP) {
1242                 oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION;
1243                 oinfo->oi_oa->o_data_version = curr_version;
1244         }
1245         ofd_object_put(env, fo);
1246 out:
1247         RETURN(rc);
1248 }
1249
1250 static int ofd_sync(const struct lu_env *env, struct obd_export *exp,
1251                     struct obd_info *oinfo, obd_size start, obd_size end,
1252                     struct ptlrpc_request_set *set)
1253 {
1254         struct ofd_device       *ofd = ofd_exp(exp);
1255         struct ofd_thread_info  *info;
1256         struct ofd_object       *fo;
1257         int                      rc = 0;
1258
1259         ENTRY;
1260
1261         /* if no objid is specified, it means "sync whole filesystem" */
1262         if (oinfo->oi_oa == NULL || !(oinfo->oi_oa->o_valid & OBD_MD_FLID)) {
1263                 rc = dt_sync(env, ofd->ofd_osd);
1264                 GOTO(out, rc);
1265         }
1266
1267         info = ofd_info_init(env, exp);
1268         fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0);
1269
1270         rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq,
1271                            oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC);
1272         if (rc)
1273                 GOTO(out, rc);
1274
1275         fo = ofd_object_find(env, ofd, &info->fti_fid);
1276         if (IS_ERR(fo)) {
1277                 CERROR("%s: error finding object "DFID": rc = %ld\n",
1278                        exp->exp_obd->obd_name, PFID(&info->fti_fid),
1279                        PTR_ERR(fo));
1280                 GOTO(out, rc = PTR_ERR(fo));
1281         }
1282
1283         ofd_write_lock(env, fo);
1284         if (!ofd_object_exists(fo))
1285                 GOTO(unlock, rc = -ENOENT);
1286
1287         rc = dt_object_sync(env, ofd_object_child(fo));
1288         if (rc)
1289                 GOTO(unlock, rc);
1290
1291         oinfo->oi_oa->o_valid = OBD_MD_FLID;
1292         rc = ofd_attr_get(env, fo, &info->fti_attr);
1293         obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS);
1294         EXIT;
1295 unlock:
1296         ofd_write_unlock(env, fo);
1297         ofd_object_put(env, fo);
1298 out:
1299         return rc;
1300 }
1301
1302 int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1303                   void *karg, void *uarg)
1304 {
1305         struct lu_env            env;
1306         struct ofd_device       *ofd = ofd_exp(exp);
1307         struct obd_device       *obd = ofd_obd(ofd);
1308         int                      rc;
1309
1310         ENTRY;
1311
1312         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
1313         rc = lu_env_init(&env, LCT_LOCAL);
1314         if (rc)
1315                 RETURN(rc);
1316
1317         switch (cmd) {
1318         case OBD_IOC_ABORT_RECOVERY:
1319                 CERROR("%s: aborting recovery\n", obd->obd_name);
1320                 target_stop_recovery_thread(obd);
1321                 break;
1322         case OBD_IOC_SYNC:
1323                 CDEBUG(D_RPCTRACE, "syncing ost %s\n", obd->obd_name);
1324                 rc = dt_sync(&env, ofd->ofd_osd);
1325                 break;
1326         case OBD_IOC_SET_READONLY:
1327                 rc = dt_sync(&env, ofd->ofd_osd);
1328                 if (rc == 0)
1329                         rc = dt_ro(&env, ofd->ofd_osd);
1330                 break;
1331         default:
1332                 CERROR("%s: not supported cmd = %d\n", obd->obd_name, cmd);
1333                 rc = -ENOTTY;
1334         }
1335
1336         lu_env_fini(&env);
1337         RETURN(rc);
1338 }
1339
1340 static int ofd_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1341 {
1342         int rc = 0;
1343
1344         ENTRY;
1345
1346         switch(stage) {
1347         case OBD_CLEANUP_EARLY:
1348                 break;
1349         case OBD_CLEANUP_EXPORTS:
1350                 target_cleanup_recovery(obd);
1351                 break;
1352         }
1353         RETURN(rc);
1354 }
1355
1356 static int ofd_ping(const struct lu_env *env, struct obd_export *exp)
1357 {
1358         return 0;
1359 }
1360
1361 static int ofd_health_check(const struct lu_env *env, struct obd_device *obd)
1362 {
1363         struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
1364         struct ofd_thread_info  *info;
1365 #ifdef USE_HEALTH_CHECK_WRITE
1366         struct thandle          *th;
1367 #endif
1368         int                      rc = 0;
1369
1370         info = ofd_info_init(env, NULL);
1371         rc = dt_statfs(env, ofd->ofd_osd, &info->fti_u.osfs);
1372         if (unlikely(rc))
1373                 GOTO(out, rc);
1374
1375         if (info->fti_u.osfs.os_state == OS_STATE_READONLY)
1376                 GOTO(out, rc = -EROFS);
1377
1378 #ifdef USE_HEALTH_CHECK_WRITE
1379         OBD_ALLOC(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
1380         if (info->fti_buf.lb_buf == NULL)
1381                 GOTO(out, rc = -ENOMEM);
1382
1383         info->fti_buf.lb_len = CFS_PAGE_SIZE;
1384         info->fti_off = 0;
1385
1386         th = dt_trans_create(env, ofd->ofd_osd);
1387         if (IS_ERR(th))
1388                 GOTO(out, rc = PTR_ERR(th));
1389
1390         rc = dt_declare_record_write(env, ofd->ofd_health_check_file,
1391                                      info->fti_buf.lb_len, info->fti_off, th);
1392         if (rc == 0) {
1393                 th->th_sync = 1; /* sync IO is needed */
1394                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1395                 if (rc == 0)
1396                         rc = dt_record_write(env, ofd->ofd_health_check_file,
1397                                              &info->fti_buf, &info->fti_off,
1398                                              th);
1399         }
1400         dt_trans_stop(env, ofd->ofd_osd, th);
1401
1402         OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
1403
1404         CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc);
1405 #endif
1406 out:
1407         return !!rc;
1408 }
1409
1410 static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused,
1411                           enum obd_notify_event ev, void *data)
1412 {
1413         switch (ev) {
1414         case OBD_NOTIFY_CONFIG:
1415                 LASSERT(obd->obd_no_conn);
1416                 cfs_spin_lock(&obd->obd_dev_lock);
1417                 obd->obd_no_conn = 0;
1418                 cfs_spin_unlock(&obd->obd_dev_lock);
1419                 break;
1420         default:
1421                 CDEBUG(D_INFO, "%s: Unhandled notification %#x\n",
1422                        obd->obd_name, ev);
1423         }
1424         return 0;
1425 }
1426
1427 /*
1428  * Handle quotacheck requests.
1429  *
1430  * \param obd - is the obd device associated with the ofd
1431  * \param exp - is the client's export
1432  * \param oqctl - is the obd_quotactl request to be processed
1433  */
1434 static int ofd_quotacheck(struct obd_device *obd, struct obd_export *exp,
1435                           struct obd_quotactl *oqctl)
1436 {
1437         return 0;
1438 }
1439
1440 /*
1441  * Handle quota control requests to consult current usage/limit, but also
1442  * to configure quota enforcement
1443  *
1444  * \param obd - is the obd device associated with the ofd
1445  * \param exp - is the client's export
1446  * \param oqctl - is the obd_quotactl request to be processed
1447  */
1448 static int ofd_quotactl(struct obd_device *obd, struct obd_export *exp,
1449                         struct obd_quotactl *oqctl)
1450 {
1451         return 0;
1452 }
1453
1454 struct obd_ops ofd_obd_ops = {
1455         .o_owner                = THIS_MODULE,
1456         .o_connect              = ofd_obd_connect,
1457         .o_reconnect            = ofd_obd_reconnect,
1458         .o_disconnect           = ofd_obd_disconnect,
1459         .o_set_info_async       = ofd_set_info_async,
1460         .o_get_info             = ofd_get_info,
1461         .o_create               = ofd_create,
1462         .o_statfs               = ofd_statfs,
1463         .o_setattr              = ofd_setattr,
1464         .o_preprw               = ofd_preprw,
1465         .o_commitrw             = ofd_commitrw,
1466         .o_destroy              = ofd_destroy,
1467         .o_init_export          = ofd_init_export,
1468         .o_destroy_export       = ofd_destroy_export,
1469         .o_postrecov            = ofd_obd_postrecov,
1470         .o_punch                = ofd_punch,
1471         .o_getattr              = ofd_getattr,
1472         .o_sync                 = ofd_sync,
1473         .o_iocontrol            = ofd_iocontrol,
1474         .o_precleanup           = ofd_precleanup,
1475         .o_ping                 = ofd_ping,
1476         .o_health_check         = ofd_health_check,
1477         .o_notify               = ofd_obd_notify,
1478         .o_quotactl             = ofd_quotactl,
1479         .o_quotacheck           = ofd_quotacheck,
1480 };