Whamcloud - gitweb
6135f0ff6fc4c18f8e46b8fa4c02cf961d4bcbd2
[fs/lustre-release.git] / lustre / ofd / ofd_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_obd.c
37  *
38  * Author: Andreas Dilger <adilger@whamcloud.com>
39  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
40  * Author: Mike Pershin <tappro@whamcloud.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FILTER
44
45 #include "ofd_internal.h"
46 #include <obd_cksum.h>
47
48 static int ofd_export_stats_init(struct ofd_device *ofd,
49                                  struct obd_export *exp, void *client_nid)
50 {
51         struct obd_device       *obd = ofd_obd(ofd);
52         struct nid_stat         *stats;
53         int                      num_stats, i;
54         int                      rc, newnid = 0;
55
56         ENTRY;
57
58         if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid))
59                 /* Self-export gets no proc entry */
60                 RETURN(0);
61
62         rc = lprocfs_exp_setup(exp, client_nid, &newnid);
63         if (rc) {
64                 /* Mask error for already created
65                  * /proc entries */
66                 if (rc == -EALREADY)
67                         rc = 0;
68                 RETURN(rc);
69         }
70
71         if (newnid == 0)
72                 RETURN(0);
73
74         stats = exp->exp_nid_stats;
75         LASSERT(stats != NULL);
76
77         OBD_ALLOC(stats->nid_brw_stats, sizeof(struct brw_stats));
78         if (stats->nid_brw_stats == NULL)
79                 GOTO(clean, rc = -ENOMEM);
80
81         for (i = 0; i < BRW_LAST; i++)
82                 cfs_spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock);
83
84         rc = lprocfs_seq_create(stats->nid_proc, "brw_stats", 0644,
85                                 &ofd_per_nid_stats_fops, stats);
86         if (rc)
87                 CWARN("Error adding the brw_stats file\n");
88
89         num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
90                      LPROC_OFD_LAST - 1;
91
92         stats->nid_stats = lprocfs_alloc_stats(num_stats,
93                                                LPROCFS_STATS_FLAG_NOPERCPU);
94         if (stats->nid_stats == NULL)
95                 return -ENOMEM;
96
97         lprocfs_init_ops_stats(LPROC_OFD_LAST, stats->nid_stats);
98         lprocfs_counter_init(stats->nid_stats, LPROC_OFD_READ_BYTES,
99                              LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
100         lprocfs_counter_init(stats->nid_stats, LPROC_OFD_WRITE_BYTES,
101                              LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
102
103         rc = lprocfs_register_stats(stats->nid_proc, "stats",
104                                     stats->nid_stats);
105         if (rc)
106                 GOTO(clean, rc);
107
108         rc = lprocfs_nid_ldlm_stats_init(stats);
109         if (rc) {
110                 lprocfs_free_stats(&stats->nid_stats);
111                 GOTO(clean, rc);
112         }
113
114         RETURN(0);
115 clean:
116         return rc;
117 }
118
119 static int ofd_parse_connect_data(const struct lu_env *env,
120                                   struct obd_export *exp,
121                                   struct obd_connect_data *data)
122 {
123         struct ofd_device                *ofd = ofd_exp(exp);
124         struct filter_export_data        *fed = &exp->exp_filter_data;
125
126         if (!data)
127                 RETURN(0);
128
129         CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
130                " ocd_version: %x ocd_grant: %d ocd_index: %u\n",
131                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
132                data->ocd_connect_flags, data->ocd_version,
133                data->ocd_grant, data->ocd_index);
134
135         if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) {
136                 CWARN("!!! This export (nid %s) used object group %d "
137                       "earlier; now it's trying to use group %d!  This could "
138                       "be a bug in the MDS. Please report to "
139                       "http://bugs.whamcloud.com/\n",
140                       obd_export_nid2str(exp), fed->fed_group,
141                       data->ocd_group);
142                 RETURN(-EPROTO);
143         }
144         fed->fed_group = data->ocd_group;
145
146         data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
147         exp->exp_connect_flags = data->ocd_connect_flags;
148         data->ocd_version = LUSTRE_VERSION_CODE;
149
150         /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */
151         if (data->ocd_connect_flags & OBD_CONNECT_MDS)
152                 CDEBUG(D_HA, "%s: Received MDS connection for group %u\n",
153                        exp->exp_obd->obd_name, data->ocd_group);
154         else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN)
155                 RETURN(-EPROTO);
156
157         if (ofd_grant_param_supp(exp)) {
158                 exp->exp_filter_data.fed_pagesize = data->ocd_blocksize;
159                 /* ocd_{blocksize,inodespace} are log2 values */
160                 data->ocd_blocksize  = ofd->ofd_blockbits;
161                 data->ocd_inodespace = ofd->ofd_dt_conf.ddp_inodespace;
162                 /* ocd_grant_extent is in 1K blocks */
163                 data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10;
164         }
165
166         if (exp->exp_connect_flags & OBD_CONNECT_GRANT)
167                 data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant);
168
169         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
170                 struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd;
171                 int                    index = lsd->lsd_ost_index;
172
173                 if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) {
174                         /* this will only happen on the first connect */
175                         lsd->lsd_ost_index = data->ocd_index;
176                         lsd->lsd_feature_compat |= OBD_COMPAT_OST;
177                         /* sync is not needed here as lut_client_add will
178                          * set exp_need_sync flag */
179                         lut_server_data_update(env, &ofd->ofd_lut, 0);
180                 } else if (index != data->ocd_index) {
181                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
182                                            " %u doesn't match actual OST index"
183                                            " %u in last_rcvd file, bad "
184                                            "configuration?\n",
185                                            obd_export_nid2str(exp), index,
186                                            data->ocd_index);
187                         RETURN(-EBADF);
188                 }
189         }
190
191         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) {
192                 data->ocd_brw_size = 65536;
193         } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
194                 data->ocd_brw_size = min(data->ocd_brw_size,
195                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
196                 if (data->ocd_brw_size == 0) {
197                         CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
198                                " ocd_version: %x ocd_grant: %d ocd_index: %u "
199                                "ocd_brw_size is unexpectedly zero, "
200                                "network data corruption?"
201                                "Refusing connection of this client\n",
202                                exp->exp_obd->obd_name,
203                                exp->exp_client_uuid.uuid,
204                                exp, data->ocd_connect_flags, data->ocd_version,
205                                data->ocd_grant, data->ocd_index);
206                         RETURN(-EPROTO);
207                 }
208         }
209
210         if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
211                 __u32 cksum_types = data->ocd_cksum_types;
212
213                 /* The client set in ocd_cksum_types the checksum types it
214                  * supports. We have to mask off the algorithms that we don't
215                  * support */
216                 data->ocd_cksum_types &= cksum_types_supported();
217
218                 if (unlikely(data->ocd_cksum_types == 0)) {
219                         CERROR("%s: Connect with checksum support but no "
220                                "ocd_cksum_types is set\n",
221                                exp->exp_obd->obd_name);
222                         RETURN(-EPROTO);
223                 }
224
225                 CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
226                        "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
227                        cksum_types, data->ocd_cksum_types);
228         } else {
229                 /* This client does not support OBD_CONNECT_CKSUM
230                  * fall back to CRC32 */
231                 CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
232                        "OBD_CONNECT_CKSUM, CRC32 will be used\n",
233                        exp->exp_obd->obd_name, obd_export_nid2str(exp));
234         }
235
236         if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES)
237                 data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes;
238
239         RETURN(0);
240 }
241
242 static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
243                              struct obd_device *obd, struct obd_uuid *cluuid,
244                              struct obd_connect_data *data, void *localdata)
245 {
246         struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
247         int                      rc;
248
249         ENTRY;
250
251         if (exp == NULL || obd == NULL || cluuid == NULL)
252                 RETURN(-EINVAL);
253
254         rc = lu_env_refill((struct lu_env *)env);
255         if (rc != 0) {
256                 CERROR("Failure to refill session: '%d'\n", rc);
257                 RETURN(rc);
258         }
259
260         ofd_info_init(env, exp);
261         rc = ofd_parse_connect_data(env, exp, data);
262         if (rc == 0)
263                 ofd_export_stats_init(ofd, exp, localdata);
264
265         RETURN(rc);
266 }
267
268 static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
269                            struct obd_device *obd, struct obd_uuid *cluuid,
270                            struct obd_connect_data *data, void *localdata)
271 {
272         struct obd_export       *exp;
273         struct ofd_device       *ofd;
274         struct lustre_handle     conn = { 0 };
275         int                      rc, group;
276
277         ENTRY;
278
279         if (_exp == NULL || obd == NULL || cluuid == NULL)
280                 RETURN(-EINVAL);
281
282         ofd = ofd_dev(obd->obd_lu_dev);
283
284         rc = class_connect(&conn, obd, cluuid);
285         if (rc)
286                 RETURN(rc);
287
288         exp = class_conn2export(&conn);
289         LASSERT(exp != NULL);
290
291         rc = lu_env_refill((struct lu_env *)env);
292         if (rc != 0) {
293                 CERROR("Failure to refill session: '%d'\n", rc);
294                 GOTO(out, rc);
295         }
296
297         ofd_info_init(env, exp);
298
299         rc = ofd_parse_connect_data(env, exp, data);
300         if (rc)
301                 GOTO(out, rc);
302
303         group = data->ocd_group;
304         if (obd->obd_replayable) {
305                 struct tg_export_data *ted = &exp->exp_target_data;
306
307                 memcpy(ted->ted_lcd->lcd_uuid, cluuid,
308                        sizeof(ted->ted_lcd->lcd_uuid));
309                 rc = lut_client_new(env, exp);
310                 if (rc != 0)
311                         GOTO(out, rc);
312                 ofd_export_stats_init(ofd, exp, localdata);
313         }
314         if (group == 0)
315                 GOTO(out, rc = 0);
316
317         /* init new group */
318         if (group > ofd->ofd_max_group) {
319                 ofd->ofd_max_group = group;
320                 rc = ofd_group_load(env, ofd, group);
321         }
322 out:
323         if (rc != 0) {
324                 class_disconnect(exp);
325                 *_exp = NULL;
326         } else {
327                 *_exp = exp;
328         }
329         RETURN(rc);
330 }
331
332 static int ofd_obd_disconnect(struct obd_export *exp)
333 {
334         struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
335         struct lu_env            env;
336         int                      rc;
337
338         ENTRY;
339
340         LASSERT(exp);
341         class_export_get(exp);
342
343         if (!(exp->exp_flags & OBD_OPT_FORCE))
344                 ofd_grant_sanity_check(ofd_obd(ofd), __FUNCTION__);
345
346         rc = server_disconnect_export(exp);
347
348         ofd_grant_discard(exp);
349
350         rc = lu_env_init(&env, LCT_DT_THREAD);
351         if (rc)
352                 RETURN(rc);
353
354         /* Do not erase record for recoverable client. */
355         if (exp->exp_obd->obd_replayable &&
356             (!exp->exp_obd->obd_fail || exp->exp_failed))
357                 lut_client_del(&env, exp);
358         lu_env_fini(&env);
359
360         class_export_put(exp);
361         RETURN(rc);
362 }
363
364 static int ofd_init_export(struct obd_export *exp)
365 {
366         int rc;
367
368         cfs_spin_lock_init(&exp->exp_filter_data.fed_lock);
369         CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
370         cfs_spin_lock(&exp->exp_lock);
371         exp->exp_connecting = 1;
372         cfs_spin_unlock(&exp->exp_lock);
373
374         /* self-export doesn't need client data and ldlm initialization */
375         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
376                                      &exp->exp_client_uuid)))
377                 return 0;
378
379         rc = lut_client_alloc(exp);
380         if (rc == 0)
381                 ldlm_init_export(exp);
382         if (rc)
383                 CERROR("%s: Can't initialize export: rc %d\n",
384                        exp->exp_obd->obd_name, rc);
385         return rc;
386 }
387
388 static int ofd_destroy_export(struct obd_export *exp)
389 {
390         struct ofd_device *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
391
392         if (exp->exp_filter_data.fed_pending)
393                 CERROR("%s: cli %s/%p has %lu pending on destroyed export"
394                        "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
395                        exp, exp->exp_filter_data.fed_pending);
396
397         target_destroy_export(exp);
398
399         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
400                                      &exp->exp_client_uuid)))
401                 return 0;
402
403         ldlm_destroy_export(exp);
404         lut_client_free(exp);
405
406         ofd_fmd_cleanup(exp);
407
408         /*
409          * discard grants once we're sure no more
410          * interaction with the client is possible
411          */
412         ofd_grant_discard(exp);
413         ofd_fmd_cleanup(exp);
414
415         if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) {
416                 if (ofd->ofd_tot_granted_clients > 0)
417                         ofd->ofd_tot_granted_clients --;
418         }
419
420         if (!(exp->exp_flags & OBD_OPT_FORCE))
421                 ofd_grant_sanity_check(exp->exp_obd, __FUNCTION__);
422
423         LASSERT(cfs_list_empty(&exp->exp_filter_data.fed_mod_list));
424         return 0;
425 }
426
427 int ofd_obd_postrecov(struct obd_device *obd)
428 {
429         struct lu_env            env;
430         struct lu_device        *ldev = obd->obd_lu_dev;
431         int                      rc;
432
433         ENTRY;
434
435         rc = lu_env_init(&env, LCT_DT_THREAD);
436         if (rc)
437                 RETURN(rc);
438         ofd_info_init(&env, obd->obd_self_export);
439
440         rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev);
441         lu_env_fini(&env);
442         RETURN(rc);
443 }
444
445 static int ofd_set_mds_conn(struct obd_export *exp, void *val)
446 {
447         int rc = 0;
448
449         ENTRY;
450
451         LCONSOLE_WARN("%s: received MDS connection from %s\n",
452                       exp->exp_obd->obd_name, obd_export_nid2str(exp));
453         RETURN(rc);
454 }
455
456 static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp,
457                               __u32 keylen, void *key, __u32 vallen, void *val,
458                               struct ptlrpc_request_set *set)
459 {
460         struct ofd_device       *ofd = ofd_exp(exp);
461         int                      rc = 0;
462
463         ENTRY;
464
465         if (exp->exp_obd == NULL) {
466                 CDEBUG(D_IOCTL, "invalid export %p\n", exp);
467                 RETURN(-EINVAL);
468         }
469
470         if (KEY_IS(KEY_CAPA_KEY)) {
471                 rc = ofd_update_capa_key(ofd, val);
472                 if (rc)
473                         CERROR("ofd update capability key failed: %d\n", rc);
474         } else if (KEY_IS(KEY_MDS_CONN)) {
475                 rc = ofd_set_mds_conn(exp, val);
476         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
477                 struct ost_body *body = val;
478
479                 /** handle grant shrink, similar to a read request */
480                 ofd_grant_prepare_read(env, exp, &body->oa);
481         } else {
482                 CERROR("%s: Unsupported key %s\n",
483                        exp->exp_obd->obd_name, (char*)key);
484                 rc = -EOPNOTSUPP;
485         }
486
487         RETURN(rc);
488 }
489
490 static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
491                         __u32 keylen, void *key, __u32 *vallen, void *val,
492                         struct lov_stripe_md *lsm)
493 {
494         struct ofd_device       *ofd = ofd_exp(exp);
495         int                      rc = 0;
496
497         ENTRY;
498
499         if (exp->exp_obd == NULL) {
500                 CDEBUG(D_IOCTL, "invalid client export %p\n", exp);
501                 RETURN(-EINVAL);
502         }
503
504         if (KEY_IS(KEY_BLOCKSIZE)) {
505                 __u32 *blocksize = val;
506                 if (blocksize) {
507                         if (*vallen < sizeof(*blocksize))
508                                 RETURN(-EOVERFLOW);
509                         *blocksize = 1 << ofd->ofd_dt_conf.ddp_block_shift;
510                 }
511                 *vallen = sizeof(*blocksize);
512         } else if (KEY_IS(KEY_BLOCKSIZE_BITS)) {
513                 __u32 *blocksize_bits = val;
514                 if (blocksize_bits) {
515                         if (*vallen < sizeof(*blocksize_bits))
516                                 RETURN(-EOVERFLOW);
517                         *blocksize_bits = ofd->ofd_dt_conf.ddp_block_shift;
518                 }
519                 *vallen = sizeof(*blocksize_bits);
520         } else if (KEY_IS(KEY_LAST_ID)) {
521                 obd_id *last_id = val;
522                 if (last_id) {
523                         if (*vallen < sizeof(*last_id))
524                                 RETURN(-EOVERFLOW);
525                         *last_id = ofd_last_id(ofd,
526                                                exp->exp_filter_data.fed_group);
527                 }
528                 *vallen = sizeof(*last_id);
529         } else {
530                 CERROR("Not supported key %s\n", (char*)key);
531                 rc = -EOPNOTSUPP;
532         }
533
534         RETURN(rc);
535 }
536
537 /** helper function for statfs, also used by grant code */
538 int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
539                         struct obd_statfs *osfs, __u64 max_age, int *from_cache)
540 {
541         int rc;
542
543         cfs_spin_lock(&ofd->ofd_osfs_lock);
544         if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) {
545                 obd_size unstable;
546
547                 /* statfs data are too old, get up-to-date one.
548                  * we must be cautious here since multiple threads might be
549                  * willing to update statfs data concurrently and we must
550                  * grant that cached statfs data are always consistent */
551
552                 if (ofd->ofd_statfs_inflight == 0)
553                         /* clear inflight counter if no users, although it would
554                          * take a while to overflow this 64-bit counter ... */
555                         ofd->ofd_osfs_inflight = 0;
556                 /* notify ofd_grant_commit() that we want to track writes
557                  * completed as of now */
558                 ofd->ofd_statfs_inflight++;
559                 /* record value of inflight counter before running statfs to
560                  * compute the diff once statfs is completed */
561                 unstable = ofd->ofd_osfs_inflight;
562                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
563
564                 /* statfs can sleep ... hopefully not for too long since we can
565                  * call it fairly often as space fills up */
566                 rc = dt_statfs(env, ofd->ofd_osd, osfs);
567                 if (unlikely(rc))
568                         return rc;
569
570                 cfs_spin_lock(&ofd->ofd_grant_lock);
571                 cfs_spin_lock(&ofd->ofd_osfs_lock);
572                 /* calculate how much space was written while we released the
573                  * ofd_osfs_lock */
574                 unstable = ofd->ofd_osfs_inflight - unstable;
575                 ofd->ofd_osfs_unstable = 0;
576                 if (unstable) {
577                         /* some writes completed while we were running statfs
578                          * w/o the ofd_osfs_lock. Those ones got added to
579                          * the cached statfs data that we are about to crunch.
580                          * Take them into account in the new statfs data */
581                         osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
582                                                unstable >> ofd->ofd_blockbits);
583                         /* However, we don't really know if those writes got
584                          * accounted in the statfs call, so tell
585                          * ofd_grant_space_left() there is some uncertainty
586                          * on the accounting of those writes.
587                          * The purpose is to prevent spurious error messages in
588                          * ofd_grant_space_left() since those writes might be
589                          * accounted twice. */
590                         ofd->ofd_osfs_unstable += unstable;
591                 }
592                 /* similarly, there is some uncertainty on write requests
593                  * between prepare & commit */
594                 ofd->ofd_osfs_unstable += ofd->ofd_tot_pending;
595                 cfs_spin_unlock(&ofd->ofd_grant_lock);
596
597                 /* finally udpate cached statfs data */
598                 ofd->ofd_osfs = *osfs;
599                 ofd->ofd_osfs_age = cfs_time_current_64();
600
601                 ofd->ofd_statfs_inflight--; /* stop tracking */
602                 if (ofd->ofd_statfs_inflight == 0)
603                         ofd->ofd_osfs_inflight = 0;
604                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
605
606                 if (from_cache)
607                         *from_cache = 0;
608         } else {
609                 /* use cached statfs data */
610                 *osfs = ofd->ofd_osfs;
611                 cfs_spin_unlock(&ofd->ofd_osfs_lock);
612                 if (from_cache)
613                         *from_cache = 1;
614         }
615         return 0;
616 }
617
618 static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
619                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
620 {
621         struct obd_device       *obd = class_exp2obd(exp);
622         struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
623         int                      rc;
624
625         ENTRY;
626
627         rc = ofd_statfs_internal(env, ofd, osfs, max_age, NULL);
628         if (unlikely(rc))
629                 GOTO(out, rc);
630
631         /* at least try to account for cached pages.  its still racy and
632          * might be under-reporting if clients haven't announced their
633          * caches with brw recently */
634
635         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
636                " pending "LPU64" free "LPU64" avail "LPU64"\n",
637                ofd->ofd_tot_dirty, ofd->ofd_tot_granted, ofd->ofd_tot_pending,
638                osfs->os_bfree << ofd->ofd_blockbits,
639                osfs->os_bavail << ofd->ofd_blockbits);
640
641         osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
642                                  ((ofd->ofd_tot_dirty + ofd->ofd_tot_pending +
643                                    osfs->os_bsize - 1) >> ofd->ofd_blockbits));
644
645         /* The QoS code on the MDS does not care about space reserved for
646          * precreate, so take it out. */
647         if (exp->exp_connect_flags & OBD_CONNECT_MDS) {
648                 struct filter_export_data *fed;
649
650                 fed = &obd->obd_self_export->exp_filter_data;
651                 osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
652                                          fed->fed_grant >> ofd->ofd_blockbits);
653         }
654
655         ofd_grant_sanity_check(obd, __FUNCTION__);
656         CDEBUG(D_CACHE, LPU64" blocks: "LPU64" free, "LPU64" avail; "
657                LPU64" objects: "LPU64" free; state %x\n",
658                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
659                osfs->os_files, osfs->os_ffree, osfs->os_state);
660
661         if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC,
662                                  ofd->ofd_lut.lut_lsd.lsd_ost_index))
663                 osfs->os_bfree = osfs->os_bavail = 2;
664
665         if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO,
666                                  ofd->ofd_lut.lut_lsd.lsd_ost_index))
667                 osfs->os_ffree = 0;
668
669         /* OS_STATE_READONLY can be set by OSD already */
670         if (ofd->ofd_raid_degraded)
671                 osfs->os_state |= OS_STATE_DEGRADED;
672
673         if (obd->obd_self_export != exp && ofd_grant_compat(exp, ofd)) {
674                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
675                  * should not see a block size > page size, otherwise
676                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
677                  * block size which is the biggest block size known to work
678                  * with all client's page size. */
679                 osfs->os_blocks <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
680                 osfs->os_bfree  <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
681                 osfs->os_bavail <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
682                 osfs->os_bsize    = 1 << COMPAT_BSIZE_SHIFT;
683         }
684
685         EXIT;
686 out:
687         return rc;
688 }
689
690 static int ofd_sync(const struct lu_env *env, struct obd_export *exp,
691                     struct obd_info *oinfo, obd_size start, obd_size end,
692                     struct ptlrpc_request_set *set)
693 {
694         struct ofd_device       *ofd = ofd_exp(exp);
695         int                      rc = 0;
696
697         ENTRY;
698
699         /* if no objid is specified, it means "sync whole filesystem" */
700         if (oinfo->oi_oa == NULL || !(oinfo->oi_oa->o_valid & OBD_MD_FLID)) {
701                 rc = dt_sync(env, ofd->ofd_osd);
702                 GOTO(out, rc);
703         }
704
705         EXIT;
706 out:
707         return rc;
708 }
709
710 int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
711                   void *karg, void *uarg)
712 {
713         struct lu_env            env;
714         struct ofd_device       *ofd = ofd_exp(exp);
715         struct obd_device       *obd = ofd_obd(ofd);
716         int                      rc;
717
718         ENTRY;
719
720         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
721         rc = lu_env_init(&env, LCT_LOCAL);
722         if (rc)
723                 RETURN(rc);
724
725         switch (cmd) {
726         case OBD_IOC_ABORT_RECOVERY:
727                 CERROR("aborting recovery for device %s\n", obd->obd_name);
728                 target_stop_recovery_thread(obd);
729                 break;
730         case OBD_IOC_SYNC:
731                 CDEBUG(D_RPCTRACE, "syncing ost %s\n", obd->obd_name);
732                 rc = dt_sync(&env, ofd->ofd_osd);
733                 break;
734         case OBD_IOC_SET_READONLY:
735                 rc = dt_sync(&env, ofd->ofd_osd);
736                 if (rc == 0)
737                         rc = dt_ro(&env, ofd->ofd_osd);
738                 break;
739         default:
740                 CERROR("Not supported cmd = %d for device %s\n",
741                        cmd, obd->obd_name);
742                 rc = -ENOTTY;
743         }
744
745         lu_env_fini(&env);
746         RETURN(rc);
747 }
748
749 static int ofd_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
750 {
751         int rc = 0;
752
753         ENTRY;
754
755         switch(stage) {
756         case OBD_CLEANUP_EARLY:
757                 break;
758         case OBD_CLEANUP_EXPORTS:
759                 target_cleanup_recovery(obd);
760                 break;
761         }
762         RETURN(rc);
763 }
764
765 static int ofd_ping(const struct lu_env *env, struct obd_export *exp)
766 {
767         return 0;
768 }
769
770 static int ofd_health_check(const struct lu_env *env, struct obd_device *obd)
771 {
772         struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
773         struct ofd_thread_info  *info;
774 #ifdef USE_HEALTH_CHECK_WRITE
775         struct thandle          *th;
776 #endif
777         int                      rc = 0;
778
779         info = ofd_info_init(env, NULL);
780         rc = dt_statfs(env, ofd->ofd_osd, &info->fti_u.osfs);
781         if (unlikely(rc))
782                 GOTO(out, rc);
783
784         if (info->fti_u.osfs.os_state == OS_STATE_READONLY)
785                 GOTO(out, rc = -EROFS);
786
787 #ifdef USE_HEALTH_CHECK_WRITE
788         OBD_ALLOC(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
789         if (info->fti_buf.lb_buf == NULL)
790                 GOTO(out, rc = -ENOMEM);
791
792         info->fti_buf.lb_len = CFS_PAGE_SIZE;
793         info->fti_off = 0;
794
795         th = dt_trans_create(env, ofd->ofd_osd);
796         if (IS_ERR(th))
797                 GOTO(out, rc = PTR_ERR(th));
798
799         rc = dt_declare_record_write(env, ofd->ofd_health_check_file,
800                                      info->fti_buf.lb_len, info->fti_off, th);
801         if (rc == 0) {
802                 th->th_sync = 1; /* sync IO is needed */
803                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
804                 if (rc == 0)
805                         rc = dt_record_write(env, ofd->ofd_health_check_file,
806                                              &info->fti_buf, &info->fti_off,
807                                              th);
808         }
809         dt_trans_stop(env, ofd->ofd_osd, th);
810
811         OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
812
813         CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc);
814 #endif
815 out:
816         return !!rc;
817 }
818
819 static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused,
820                           enum obd_notify_event ev, void *data)
821 {
822         switch (ev) {
823         case OBD_NOTIFY_CONFIG:
824                 LASSERT(obd->obd_no_conn);
825                 cfs_spin_lock(&obd->obd_dev_lock);
826                 obd->obd_no_conn = 0;
827                 cfs_spin_unlock(&obd->obd_dev_lock);
828                 break;
829         default:
830                 CDEBUG(D_INFO, "%s: Unhandled notification %#x\n",
831                        obd->obd_name, ev);
832         }
833         return 0;
834 }
835
836 struct obd_ops ofd_obd_ops = {
837         .o_owner                = THIS_MODULE,
838         .o_connect              = ofd_obd_connect,
839         .o_reconnect            = ofd_obd_reconnect,
840         .o_disconnect           = ofd_obd_disconnect,
841         .o_set_info_async       = ofd_set_info_async,
842         .o_get_info             = ofd_get_info,
843         .o_statfs               = ofd_statfs,
844         .o_init_export          = ofd_init_export,
845         .o_destroy_export       = ofd_destroy_export,
846         .o_postrecov            = ofd_obd_postrecov,
847         .o_sync                 = ofd_sync,
848         .o_iocontrol            = ofd_iocontrol,
849         .o_precleanup           = ofd_precleanup,
850         .o_ping                 = ofd_ping,
851         .o_health_check         = ofd_health_check,
852         .o_notify               = ofd_obd_notify,
853 };