Whamcloud - gitweb
b=21587 don't LBUG if transno has changed during replay
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  void *data, int rc)
168 {
169         struct ost_body *body;
170         struct osc_async_args *aa = data;
171         ENTRY;
172
173         if (rc != 0)
174                 GOTO(out, rc);
175
176         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177                                   lustre_swab_ost_body);
178         if (body) {
179                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
181
182                 /* This should really be sent by the OST */
183                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185         } else {
186                 CERROR("can't unpack ost_body\n");
187                 rc = -EPROTO;
188                 aa->aa_oi->oi_oa->o_valid = 0;
189         }
190 out:
191         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
192         RETURN(rc);
193 }
194
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196                              struct ptlrpc_request_set *set)
197 {
198         struct ptlrpc_request *req;
199         struct ost_body *body;
200         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201         struct osc_async_args *aa;
202         ENTRY;
203
204         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205                               OST_GETATTR, 2, size,NULL);
206         if (!req)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
211
212         ptlrpc_req_set_repsize(req, 2, size);
213         req->rq_interpret_reply = osc_getattr_interpret;
214
215         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216         aa = ptlrpc_req_async_args(req);
217         aa->aa_oi = oinfo;
218
219         ptlrpc_set_add_req(set, req);
220         RETURN (0);
221 }
222
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 {
225         struct ptlrpc_request *req;
226         struct ost_body *body;
227         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
228         int rc;
229         ENTRY;
230
231         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232                               OST_GETATTR, 2, size, NULL);
233         if (!req)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
238
239         ptlrpc_req_set_repsize(req, 2, size);
240
241         rc = ptlrpc_queue_wait(req);
242         if (rc) {
243                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
244                 GOTO(out, rc);
245         }
246
247         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248                                   lustre_swab_ost_body);
249         if (body == NULL) {
250                 CERROR ("can't unpack ost_body\n");
251                 GOTO (out, rc = -EPROTO);
252         }
253
254         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
256
257         /* This should really be sent by the OST */
258         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
260
261         EXIT;
262  out:
263         ptlrpc_req_finished(req);
264         return rc;
265 }
266
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268                        struct obd_trans_info *oti)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body *body;
272         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
273         int rc;
274         ENTRY;
275
276         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277                               OST_SETATTR, 2, size, NULL);
278         if (!req)
279                 RETURN(-ENOMEM);
280
281         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
283
284         ptlrpc_req_set_repsize(req, 2, size);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291                                   lustre_swab_ost_body);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         EXIT;
298 out:
299         ptlrpc_req_finished(req);
300         RETURN(rc);
301 }
302
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
304                                  void *data, int rc)
305 {
306         struct ost_body *body;
307         struct osc_async_args *aa = data;
308         ENTRY;
309
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314                                   lustre_swab_ost_body);
315         if (body == NULL) {
316                 CERROR("can't unpack ost_body\n");
317                 GOTO(out, rc = -EPROTO);
318         }
319
320         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
321 out:
322         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
323         RETURN(rc);
324 }
325
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327                              struct obd_trans_info *oti,
328                              struct ptlrpc_request_set *rqset)
329 {
330         struct ptlrpc_request *req;
331         struct ost_body *body;
332         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
333         int bufcount = 2;
334         struct osc_async_args *aa;
335         ENTRY;
336
337         if (osc_exp_is_2_0_server(exp)) {
338                 bufcount = 3;
339         }
340
341         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342                               OST_SETATTR, bufcount, size, NULL);
343         if (!req)
344                 RETURN(-ENOMEM);
345
346         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
347
348         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
349                 LASSERT(oti);
350                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351         }
352
353         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354         ptlrpc_req_set_repsize(req, 2, size);
355         /* do mds to ost setattr asynchronouly */
356         if (!rqset) {
357                 /* Do not wait for response. */
358                 ptlrpcd_add_req(req);
359         } else {
360                 req->rq_interpret_reply = osc_setattr_interpret;
361
362                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363                 aa = ptlrpc_req_async_args(req);
364                 aa->aa_oi = oinfo;
365
366                 ptlrpc_set_add_req(rqset, req);
367         }
368
369         RETURN(0);
370 }
371
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *req;
376         struct ost_body *body;
377         struct lov_stripe_md *lsm;
378         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
379         int rc;
380         ENTRY;
381
382         LASSERT(oa);
383         LASSERT(ea);
384
385         lsm = *ea;
386         if (!lsm) {
387                 rc = obd_alloc_memmd(exp, &lsm);
388                 if (rc < 0)
389                         RETURN(rc);
390         }
391
392         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393                               OST_CREATE, 2, size, NULL);
394         if (!req)
395                 GOTO(out, rc = -ENOMEM);
396
397         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398         lustre_set_wire_obdo(&body->oa, oa);
399
400         ptlrpc_req_set_repsize(req, 2, size);
401         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402             oa->o_flags == OBD_FL_DELORPHAN) {
403                 DEBUG_REQ(D_HA, req,
404                           "delorphan from OST integration");
405                 /* Don't resend the delorphan req */
406                 req->rq_no_resend = req->rq_no_delay = 1;
407         }
408
409         rc = ptlrpc_queue_wait(req);
410         if (rc)
411                 GOTO(out_req, rc);
412
413         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414                                   lustre_swab_ost_body);
415         if (body == NULL) {
416                 CERROR ("can't unpack ost_body\n");
417                 GOTO (out_req, rc = -EPROTO);
418         }
419
420         lustre_get_wire_obdo(oa, &body->oa);
421
422         /* This should really be sent by the OST */
423         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424         oa->o_valid |= OBD_MD_FLBLKSZ;
425
426         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427          * have valid lsm_oinfo data structs, so don't go touching that.
428          * This needs to be fixed in a big way.
429          */
430         lsm->lsm_object_id = oa->o_id;
431         *ea = lsm;
432
433         if (oti != NULL) {
434                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
435
436                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437                         if (!oti->oti_logcookies)
438                                 oti_alloc_cookies(oti, 1);
439                         *oti->oti_logcookies = oa->o_lcookie;
440                 }
441         }
442
443         CDEBUG(D_HA, "transno: "LPD64"\n",
444                lustre_msg_get_transno(req->rq_repmsg));
445 out_req:
446         ptlrpc_req_finished(req);
447 out:
448         if (rc && !*ea)
449                 obd_free_memmd(exp, &lsm);
450         RETURN(rc);
451 }
452
453 static int osc_punch_interpret(struct ptlrpc_request *req,
454                                void *data, int rc)
455 {
456         struct ost_body *body;
457         struct osc_async_args *aa = data;
458         ENTRY;
459
460         if (rc != 0)
461                 GOTO(out, rc);
462
463         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464                                   lustre_swab_ost_body);
465         if (body == NULL) {
466                 CERROR ("can't unpack ost_body\n");
467                 GOTO(out, rc = -EPROTO);
468         }
469
470         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
471 out:
472         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473         RETURN(rc);
474 }
475
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477                      struct obd_trans_info *oti,
478                      struct ptlrpc_request_set *rqset)
479 {
480         struct ptlrpc_request *req;
481         struct osc_async_args *aa;
482         struct ost_body *body;
483         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
484         ENTRY;
485
486         if (!oinfo->oi_oa) {
487                 CERROR("oa NULL\n");
488                 RETURN(-EINVAL);
489         }
490
491         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492                               OST_PUNCH, 2, size, NULL);
493         if (!req)
494                 RETURN(-ENOMEM);
495
496         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
497         ptlrpc_at_set_req_timeout(req);
498
499         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
501
502         /* overload the size and blocks fields in the oa with start/end */
503         body->oa.o_size = oinfo->oi_policy.l_extent.start;
504         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
506
507         ptlrpc_req_set_repsize(req, 2, size);
508
509         req->rq_interpret_reply = osc_punch_interpret;
510         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511         aa = ptlrpc_req_async_args(req);
512         aa->aa_oi = oinfo;
513         ptlrpc_set_add_req(rqset, req);
514
515         RETURN(0);
516 }
517
518 static int osc_sync_interpret(struct ptlrpc_request *req,
519                               void *data, int rc)
520 {
521         struct ost_body *body;
522         struct osc_async_args *aa = data;
523         ENTRY;
524
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("can't unpack ost_body\n");
532                 GOTO(out, rc = -EPROTO);
533         }
534
535         *aa->aa_oi->oi_oa = body->oa;
536 out:
537         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
538         RETURN(rc);
539 }
540
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542                     obd_size start, obd_size end,
543                     struct ptlrpc_request_set *set)
544 {
545         struct ptlrpc_request *req;
546         struct ost_body *body;
547         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548         struct osc_async_args *aa;
549         ENTRY;
550
551         if (!oinfo->oi_oa) {
552                 CERROR("oa NULL\n");
553                 RETURN(-EINVAL);
554         }
555
556         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557                               OST_SYNC, 2, size, NULL);
558         if (!req)
559                 RETURN(-ENOMEM);
560
561         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
563
564         /* overload the size and blocks fields in the oa with start/end */
565         body->oa.o_size = start;
566         body->oa.o_blocks = end;
567         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
568
569         ptlrpc_req_set_repsize(req, 2, size);
570         req->rq_interpret_reply = osc_sync_interpret;
571
572         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573         aa = ptlrpc_req_async_args(req);
574         aa->aa_oi = oinfo;
575
576         ptlrpc_set_add_req(set, req);
577         RETURN (0);
578 }
579
580 /* Find and cancel locally locks matched by @mode in the resource found by
581  * @objid. Found locks are added into @cancel list. Returns the amount of
582  * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584                                    struct list_head *cancels, ldlm_mode_t mode,
585                                    int lock_flags)
586 {
587         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588         struct ldlm_res_id res_id;
589         struct ldlm_resource *res;
590         int count;
591         ENTRY;
592
593         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
595         if (res == NULL)
596                 RETURN(0);
597
598         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599                                            lock_flags, 0, NULL);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
605                                  int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         cfs_waitq_signal(&cli->cl_destroy_waitq);
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 cfs_waitq_signal(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 /* Destroy requests can be async always on the client, and we don't even really
633  * care about the return code since the client cannot do anything at all about
634  * a destroy failure.
635  * When the MDS is unlinking a filename, it saves the file objects into a
636  * recovery llog, and these object records are cancelled when the OST reports
637  * they were destroyed and sync'd to disk (i.e. transaction committed).
638  * If the client dies, or the OST is down when the object should be destroyed,
639  * the records are not cancelled, and when the OST reconnects to the MDS next,
640  * it will retrieve the llog unlink logs and then sends the log cancellation
641  * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
644                        struct obd_export *md_export)
645 {
646         CFS_LIST_HEAD(cancels);
647         struct ptlrpc_request *req;
648         struct ost_body *body;
649         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650                         sizeof(struct ldlm_request) };
651         int count, bufcount = 2;
652         struct client_obd *cli = &exp->exp_obd->u.cli;
653         ENTRY;
654
655         if (!oa) {
656                 CERROR("oa NULL\n");
657                 RETURN(-EINVAL);
658         }
659
660         LASSERT(oa->o_id != 0);
661
662         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663                                         LDLM_FL_DISCARD_DATA);
664         if (exp_connect_cancelset(exp))
665                 bufcount = 3;
666         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
668         if (!req)
669                 RETURN(-ENOMEM);
670
671         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
672         ptlrpc_at_set_req_timeout(req);
673
674         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
675
676         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677                 oa->o_lcookie = *oti->oti_logcookies;
678         }
679
680         lustre_set_wire_obdo(&body->oa, oa);
681         ptlrpc_req_set_repsize(req, 2, size);
682
683         /* don't throttle destroy RPCs for the MDT */
684         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685                 req->rq_interpret_reply = osc_destroy_interpret;
686                 if (!osc_can_send_destroy(cli)) {
687                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
688                                                           NULL);
689
690                         /*
691                          * Wait until the number of on-going destroy RPCs drops
692                          * under max_rpc_in_flight
693                          */
694                         l_wait_event_exclusive(cli->cl_destroy_waitq,
695                                                osc_can_send_destroy(cli), &lwi);
696                 }
697         }
698
699         /* Do not wait for response */
700         ptlrpcd_add_req(req);
701         RETURN(0);
702 }
703
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
705                                 long writing_bytes)
706 {
707         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
708
709         LASSERT(!(oa->o_valid & bits));
710
711         oa->o_valid |= bits;
712         client_obd_list_lock(&cli->cl_loi_list_lock);
713         oa->o_dirty = cli->cl_dirty;
714         if (cli->cl_dirty > cli->cl_dirty_max) {
715                 CERROR("dirty %lu > dirty_max %lu\n",
716                        cli->cl_dirty, cli->cl_dirty_max);
717                 oa->o_undirty = 0;
718         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719                 /* The atomic_read() allowing the atomic_inc() are not covered
720                  * by a lock thus they may safely race and trip this CERROR()
721                  * unless we add in a small fudge factor (+1). */
722                 CERROR("dirty %d > system dirty_max %d\n",
723                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
724                 oa->o_undirty = 0;
725         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726                 CERROR("dirty %lu - dirty_max %lu too big???\n",
727                        cli->cl_dirty, cli->cl_dirty_max);
728                 oa->o_undirty = 0;
729         } else {
730                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731                                 (cli->cl_max_rpcs_in_flight + 1);
732                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
733         }
734         oa->o_grant = cli->cl_avail_grant;
735         oa->o_dropped = cli->cl_lost_grant;
736         cli->cl_lost_grant = 0;
737         client_obd_list_unlock(&cli->cl_loi_list_lock);
738         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
740
741 }
742
743 static void osc_update_next_shrink(struct client_obd *cli)
744 {
745         cli->cl_next_shrink_grant =
746                 cfs_time_shift(cli->cl_grant_shrink_interval);
747         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748                cli->cl_next_shrink_grant);
749 }
750
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
753 {
754         atomic_inc(&obd_dirty_pages);
755         cli->cl_dirty += CFS_PAGE_SIZE;
756         cli->cl_avail_grant -= CFS_PAGE_SIZE;
757         pga->flag |= OBD_BRW_FROM_GRANT;
758         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759                CFS_PAGE_SIZE, pga, pga->pg);
760         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761                  cli->cl_avail_grant);
762         osc_update_next_shrink(cli);
763 }
764
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766  * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768                                     struct brw_page *pga, int sent)
769 {
770         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
771         ENTRY;
772
773         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
774                 EXIT;
775                 return;
776         }
777
778         pga->flag &= ~OBD_BRW_FROM_GRANT;
779         atomic_dec(&obd_dirty_pages);
780         cli->cl_dirty -= CFS_PAGE_SIZE;
781         if (!sent) {
782                 cli->cl_lost_grant += CFS_PAGE_SIZE;
783                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786                 /* For short writes we shouldn't count parts of pages that
787                  * span a whole block on the OST side, or our accounting goes
788                  * wrong.  Should match the code in filter_grant_check. */
789                 int offset = pga->off & ~CFS_PAGE_MASK;
790                 int count = pga->count + (offset & (blocksize - 1));
791                 int end = (offset + pga->count) & (blocksize - 1);
792                 if (end)
793                         count += blocksize - end;
794
795                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798                        cli->cl_avail_grant, cli->cl_dirty);
799         }
800
801         EXIT;
802 }
803
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
805 {
806         return cli->cl_r_in_flight + cli->cl_w_in_flight;
807 }
808
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
811 {
812         struct list_head *l, *tmp;
813         struct osc_cache_waiter *ocw;
814
815         ENTRY;
816         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817                 /* if we can't dirty more, we must wait until some is written */
818                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821                                "osc max %ld, sys max %d\n", cli->cl_dirty,
822                                cli->cl_dirty_max, obd_max_dirty_pages);
823                         return;
824                 }
825
826                 /* if still dirty cache but no grant wait for pending RPCs that
827                  * may yet return us some grant before doing sync writes */
828                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830                                cli->cl_w_in_flight);
831                         return;
832                 }
833
834                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835                 list_del_init(&ocw->ocw_entry);
836                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837                         /* no more RPCs in flight to return grant, do sync IO */
838                         ocw->ocw_rc = -EDQUOT;
839                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
840                 } else {
841                         osc_consume_write_grant(cli,
842                                                 &ocw->ocw_oap->oap_brw_page);
843                 }
844
845                 cfs_waitq_signal(&ocw->ocw_waitq);
846         }
847
848         EXIT;
849 }
850
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
852 {
853         client_obd_list_lock(&cli->cl_loi_list_lock);
854         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855         if (body->oa.o_valid & OBD_MD_FLGRANT)
856                 cli->cl_avail_grant += body->oa.o_grant;
857         /* waiters are woken in brw_interpret */
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859 }
860
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862                               void *key, obd_count vallen, void *val,
863                               struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
866                                       void *data, int rc)
867 {
868         struct osc_grant_args *aa = data;
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = aa->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 client_obd_list_lock(&cli->cl_loi_list_lock);
875                 cli->cl_avail_grant += oa->o_grant;
876                 client_obd_list_unlock(&cli->cl_loi_list_lock);
877                 GOTO(out, rc);
878         }
879         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880                                 lustre_swab_ost_body);
881         osc_update_grant(cli, body);
882 out:
883         OBDO_FREE(oa);
884         return rc;
885 }
886
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
888 {
889         client_obd_list_lock(&cli->cl_loi_list_lock);
890         oa->o_grant = cli->cl_avail_grant / 4;
891         cli->cl_avail_grant -= oa->o_grant;
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
894                 oa->o_valid |= OBD_MD_FLFLAGS;
895                 oa->o_flags = 0;
896         }
897         oa->o_flags |= OBD_FL_SHRINK_GRANT;
898         osc_update_next_shrink(cli);
899 }
900
901 /* Shrink the current grant, either from some large amount to enough for a
902  * full set of in-flight RPCs, or if we have already shrunk to that limit
903  * then to enough for a single RPC.  This avoids keeping more grant than
904  * needed, and avoids shrinking the grant piecemeal. */
905 static int osc_shrink_grant(struct client_obd *cli)
906 {
907         long target = (cli->cl_max_rpcs_in_flight + 1) *
908                       cli->cl_max_pages_per_rpc;
909
910         client_obd_list_lock(&cli->cl_loi_list_lock);
911         if (cli->cl_avail_grant <= target)
912                 target = cli->cl_max_pages_per_rpc;
913         client_obd_list_unlock(&cli->cl_loi_list_lock);
914
915         return osc_shrink_grant_to_target(cli, target);
916 }
917
918 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
919 {
920         int    rc = 0;
921         struct ost_body     *body;
922         ENTRY;
923
924         client_obd_list_lock(&cli->cl_loi_list_lock);
925         /* Don't shrink if we are already above or below the desired limit
926          * We don't want to shrink below a single RPC, as that will negatively
927          * impact block allocation and long-term performance. */
928         if (target < cli->cl_max_pages_per_rpc)
929                 target = cli->cl_max_pages_per_rpc;
930
931         if (target >= cli->cl_avail_grant) {
932                 client_obd_list_unlock(&cli->cl_loi_list_lock);
933                 RETURN(0);
934         }
935         client_obd_list_unlock(&cli->cl_loi_list_lock);
936
937         OBD_ALLOC_PTR(body);
938         if (!body)
939                 RETURN(-ENOMEM);
940
941         osc_announce_cached(cli, &body->oa, 0);
942
943         client_obd_list_lock(&cli->cl_loi_list_lock);
944         body->oa.o_grant = cli->cl_avail_grant - target;
945         cli->cl_avail_grant = target;
946         client_obd_list_unlock(&cli->cl_loi_list_lock);
947         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
948                 body->oa.o_valid |= OBD_MD_FLFLAGS;
949                 body->oa.o_flags = 0;
950         }
951         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
952         osc_update_next_shrink(cli);
953
954         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
955                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
956                                 sizeof(*body), body, NULL);
957         if (rc) {
958                 client_obd_list_lock(&cli->cl_loi_list_lock);
959                 cli->cl_avail_grant += body->oa.o_grant;
960                 client_obd_list_unlock(&cli->cl_loi_list_lock);
961         }
962         OBD_FREE_PTR(body);
963         RETURN(rc);
964 }
965
966 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
967 static int osc_should_shrink_grant(struct client_obd *client)
968 {
969         cfs_time_t time = cfs_time_current();
970         cfs_time_t next_shrink = client->cl_next_shrink_grant;
971
972         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973              OBD_CONNECT_GRANT_SHRINK) == 0)
974                 return 0;
975
976         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
978                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
979                         return 1;
980                 else
981                         osc_update_next_shrink(client);
982         }
983         return 0;
984 }
985
986 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
987 {
988         struct client_obd *client;
989
990         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
991                 if (osc_should_shrink_grant(client))
992                         osc_shrink_grant(client);
993         }
994         return 0;
995 }
996
997 static int osc_add_shrink_grant(struct client_obd *client)
998 {
999         int rc;
1000
1001         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1002                                        TIMEOUT_GRANT,
1003                                        osc_grant_shrink_grant_cb, NULL,
1004                                        &client->cl_grant_shrink_list);
1005         if (rc) {
1006                 CERROR("add grant client %s error %d\n",
1007                         client->cl_import->imp_obd->obd_name, rc);
1008                 return rc;
1009         }
1010         CDEBUG(D_CACHE, "add grant client %s \n",
1011                client->cl_import->imp_obd->obd_name);
1012         osc_update_next_shrink(client);
1013         return 0;
1014 }
1015
1016 static int osc_del_shrink_grant(struct client_obd *client)
1017 {
1018         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1019                                          TIMEOUT_GRANT);
1020 }
1021
1022 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1023 {
1024         /*
1025          * ocd_grant is the total grant amount we're expect to hold: if we'v
1026          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1027          * to 0 as inflight rpcs fail out; otherwise, it's avail_grant + dirty.
1028          *
1029          * race is tolerable here: if we're evicted, but imp_state already
1030          * left EVICTED state, then cl_diry must be 0 already.
1031          */
1032         client_obd_list_lock(&cli->cl_loi_list_lock);
1033         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1034                 cli->cl_avail_grant = ocd->ocd_grant;
1035         else
1036                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1037
1038         if (cli->cl_avail_grant < 0) {
1039                 CWARN("%s: available grant < 0, the OSS is probaly not running"
1040                       " with patch from bug 20278 (%ld)\n",
1041                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1042                 /* workaround for 1.6 servers which do not have
1043                  * the patch from bug 20278 */
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         }
1046         client_obd_list_unlock(&cli->cl_loi_list_lock);
1047
1048         CDEBUG(D_CACHE, "%s: setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1049                cli->cl_import->imp_obd->obd_name,
1050                cli->cl_avail_grant, cli->cl_lost_grant);
1051
1052         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1053             list_empty(&cli->cl_grant_shrink_list))
1054                 osc_add_shrink_grant(cli);
1055 }
1056
1057 /* We assume that the reason this OSC got a short read is because it read
1058  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1059  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1060  * this stripe never got written at or beyond this stripe offset yet. */
1061 static void handle_short_read(int nob_read, obd_count page_count,
1062                               struct brw_page **pga, int pshift)
1063 {
1064         char *ptr;
1065         int i = 0;
1066
1067         /* skip bytes read OK */
1068         while (nob_read > 0) {
1069                 LASSERT (page_count > 0);
1070
1071                 if (pga[i]->count > nob_read) {
1072                         /* EOF inside this page */
1073                         ptr = cfs_kmap(pga[i]->pg) +
1074                               (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1075                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1076                         cfs_kunmap(pga[i]->pg);
1077                         page_count--;
1078                         i++;
1079                         break;
1080                 }
1081
1082                 nob_read -= pga[i]->count;
1083                 page_count--;
1084                 i++;
1085         }
1086
1087         /* zero remaining pages */
1088         while (page_count-- > 0) {
1089                 ptr = cfs_kmap(pga[i]->pg) +
1090                       (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1091                 memset(ptr, 0, pga[i]->count);
1092                 cfs_kunmap(pga[i]->pg);
1093                 i++;
1094         }
1095 }
1096
1097 static int check_write_rcs(struct ptlrpc_request *req,
1098                            int requested_nob, int niocount,
1099                            obd_count page_count, struct brw_page **pga)
1100 {
1101         int    *remote_rcs, i;
1102
1103         /* return error if any niobuf was in error */
1104         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1105                                         sizeof(*remote_rcs) * niocount, NULL);
1106         if (remote_rcs == NULL) {
1107                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1108                 return(-EPROTO);
1109         }
1110         if (lustre_rep_need_swab(req))
1111                 for (i = 0; i < niocount; i++)
1112                         __swab32s(&remote_rcs[i]);
1113
1114         for (i = 0; i < niocount; i++) {
1115                 if (remote_rcs[i] < 0)
1116                         return(remote_rcs[i]);
1117
1118                 if (remote_rcs[i] != 0) {
1119                         CERROR("rc[%d] invalid (%d) req %p\n",
1120                                 i, remote_rcs[i], req);
1121                         return(-EPROTO);
1122                 }
1123         }
1124
1125         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127                        req->rq_bulk->bd_nob_transferred, requested_nob);
1128                 return(-EPROTO);
1129         }
1130
1131         return (0);
1132 }
1133
1134 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1135 {
1136         if (p1->flag != p2->flag) {
1137                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1138
1139                 /* warn if we try to combine flags that we don't know to be
1140                  * safe to combine */
1141                 if ((p1->flag & mask) != (p2->flag & mask))
1142                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1143                                "same brw?\n", p1->flag, p2->flag);
1144                 return 0;
1145         }
1146
1147         return (p1->off + p1->count == p2->off);
1148 }
1149
1150 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1151                                    struct brw_page **pga, int opc,
1152                                    cksum_type_t cksum_type, int pshift)
1153 {
1154         __u32 cksum;
1155         int i = 0;
1156
1157         LASSERT (pg_count > 0);
1158         cksum = init_checksum(cksum_type);
1159         while (nob > 0 && pg_count > 0) {
1160                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1161                 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1162                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1163
1164                 /* corrupt the data before we compute the checksum, to
1165                  * simulate an OST->client data error */
1166                 if (i == 0 && opc == OST_READ &&
1167                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1168                         memcpy(ptr + off, "bad1", min(4, nob));
1169                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1170                 cfs_kunmap(pga[i]->pg);
1171                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1172                                off, cksum);
1173
1174                 nob -= pga[i]->count;
1175                 pg_count--;
1176                 i++;
1177         }
1178         /* For sending we only compute the wrong checksum instead
1179          * of corrupting the data so it is still correct on a redo */
1180         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1181                 cksum++;
1182
1183         return cksum;
1184 }
1185
1186 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1187                                 struct lov_stripe_md *lsm, obd_count page_count,
1188                                 struct brw_page **pga,
1189                                 struct ptlrpc_request **reqp, int pshift,
1190                                 int resend)
1191 {
1192         struct ptlrpc_request   *req;
1193         struct ptlrpc_bulk_desc *desc;
1194         struct ost_body         *body;
1195         struct obd_ioobj        *ioobj;
1196         struct niobuf_remote    *niobuf;
1197         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1198         int niocount, i, requested_nob, opc, rc;
1199         struct ptlrpc_request_pool *pool;
1200         struct osc_brw_async_args *aa;
1201         struct brw_page *pg_prev;
1202
1203         ENTRY;
1204         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1205         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1206
1207         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1208         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1209
1210         for (niocount = i = 1; i < page_count; i++) {
1211                 if (!can_merge_pages(pga[i - 1], pga[i]))
1212                         niocount++;
1213         }
1214
1215         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1216         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1217
1218         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1219                                    NULL, pool);
1220         if (req == NULL)
1221                 RETURN (-ENOMEM);
1222
1223         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1224         ptlrpc_at_set_req_timeout(req);
1225
1226         if (opc == OST_WRITE)
1227                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1228                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1229         else
1230                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1231                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1232         if (desc == NULL)
1233                 GOTO(out, rc = -ENOMEM);
1234         /* NB request now owns desc and will free it when it gets freed */
1235
1236         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1237         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1238         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1239                                 niocount * sizeof(*niobuf));
1240
1241         lustre_set_wire_obdo(&body->oa, oa);
1242         obdo_to_ioobj(oa, ioobj);
1243         ioobj->ioo_bufcnt = niocount;
1244
1245         LASSERT (page_count > 0);
1246         pg_prev = pga[0];
1247         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1248                 struct brw_page *pg = pga[i];
1249
1250                 LASSERT(pg->count > 0);
1251                 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1252                          pg->count <= CFS_PAGE_SIZE,
1253                          "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1254                          i, pg, pg->off, pg->count, pshift);
1255 #ifdef __linux__
1256                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1257                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1258                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1259                          i, page_count,
1260                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1261                          pg_prev->pg, page_private(pg_prev->pg),
1262                          pg_prev->pg->index, pg_prev->off);
1263 #else
1264                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1265                          "i %d p_c %u\n", i, page_count);
1266 #endif
1267                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1268                         (pg->flag & OBD_BRW_SRVLOCK));
1269
1270                 ptlrpc_prep_bulk_page(desc, pg->pg,
1271                                       OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1272                                       pg->count);
1273                 requested_nob += pg->count;
1274
1275                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1276                         niobuf--;
1277                         niobuf->len += pg->count;
1278                 } else {
1279                         niobuf->offset = pg->off;
1280                         niobuf->len    = pg->count;
1281                         niobuf->flags  = pg->flag;
1282                 }
1283                 pg_prev = pg;
1284         }
1285
1286         LASSERTF((void *)(niobuf - niocount) ==
1287                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1288                                niocount * sizeof(*niobuf)),
1289                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1290                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1291                 (void *)(niobuf - niocount));
1292
1293         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1294         if (resend) {
1295                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1296                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1297                         body->oa.o_flags = 0;
1298                 }
1299                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1300         }
1301
1302         if (osc_should_shrink_grant(cli))
1303                 osc_shrink_grant_local(cli, &body->oa);
1304
1305         /* size[REQ_REC_OFF] still sizeof (*body) */
1306         if (opc == OST_WRITE) {
1307                 if (cli->cl_checksum) {
1308                         /* store cl_cksum_type in a local variable since
1309                          * it can be changed via lprocfs */
1310                         cksum_type_t cksum_type = cli->cl_cksum_type;
1311
1312                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1313                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1314                                 body->oa.o_flags = 0;
1315                         }
1316                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1317                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1318                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1319                                                              page_count, pga,
1320                                                              OST_WRITE,
1321                                                              cksum_type, pshift);
1322                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1323                                body->oa.o_cksum);
1324                         /* save this in 'oa', too, for later checking */
1325                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1326                         oa->o_flags |= cksum_type_pack(cksum_type);
1327                 } else {
1328                         /* clear out the checksum flag, in case this is a
1329                          * resend but cl_checksum is no longer set. b=11238 */
1330                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1331                 }
1332                 oa->o_cksum = body->oa.o_cksum;
1333                 /* 1 RC per niobuf */
1334                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1335                 ptlrpc_req_set_repsize(req, 3, size);
1336         } else {
1337                 if (cli->cl_checksum) {
1338                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1339                                 body->oa.o_flags = 0;
1340                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1341                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1342                 }
1343                 /* 1 RC for the whole I/O */
1344                 ptlrpc_req_set_repsize(req, 2, size);
1345         }
1346
1347         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1348         aa = ptlrpc_req_async_args(req);
1349         aa->aa_oa = oa;
1350         aa->aa_requested_nob = requested_nob;
1351         aa->aa_nio_count = niocount;
1352         aa->aa_page_count = page_count;
1353         aa->aa_resends = 0;
1354         aa->aa_ppga = pga;
1355         aa->aa_cli = cli;
1356         aa->aa_pshift = pshift;
1357         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1358
1359         *reqp = req;
1360         RETURN (0);
1361
1362  out:
1363         ptlrpc_req_finished (req);
1364         RETURN (rc);
1365 }
1366
1367 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1368                                 __u32 client_cksum, __u32 server_cksum, int nob,
1369                                 obd_count page_count, struct brw_page **pga,
1370                                 cksum_type_t client_cksum_type, int pshift)
1371 {
1372         __u32 new_cksum;
1373         char *msg;
1374         cksum_type_t cksum_type;
1375
1376         if (server_cksum == client_cksum) {
1377                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1378                 return 0;
1379         }
1380
1381         /* If this is mmaped file - it can be changed at any time */
1382         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1383                 return 1;
1384
1385         if (oa->o_valid & OBD_MD_FLFLAGS)
1386                 cksum_type = cksum_type_unpack(oa->o_flags);
1387         else
1388                 cksum_type = OBD_CKSUM_CRC32;
1389
1390         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1391                                       cksum_type, pshift);
1392
1393         if (cksum_type != client_cksum_type)
1394                 msg = "the server did not use the checksum type specified in "
1395                       "the original request - likely a protocol problem";
1396         else if (new_cksum == server_cksum)
1397                 msg = "changed on the client after we checksummed it - "
1398                       "likely false positive due to mmap IO (bug 11742)";
1399         else if (new_cksum == client_cksum)
1400                 msg = "changed in transit before arrival at OST";
1401         else
1402                 msg = "changed in transit AND doesn't match the original - "
1403                       "likely false positive due to mmap IO (bug 11742)";
1404
1405         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1406                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1407                            "["LPU64"-"LPU64"]\n",
1408                            msg, libcfs_nid2str(peer->nid),
1409                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1410                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1411                                                         (__u64)0,
1412                            oa->o_id,
1413                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1414                            pga[0]->off,
1415                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1416         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1417                "client csum now %x\n", client_cksum, client_cksum_type,
1418                server_cksum, cksum_type, new_cksum);
1419
1420         return 1;
1421 }
1422
1423 /* Note rc enters this function as number of bytes transferred */
1424 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1425 {
1426         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1427         const lnet_process_id_t *peer =
1428                         &req->rq_import->imp_connection->c_peer;
1429         struct client_obd *cli = aa->aa_cli;
1430         struct ost_body *body;
1431         __u32 client_cksum = 0;
1432         ENTRY;
1433
1434         if (rc < 0 && rc != -EDQUOT)
1435                 RETURN(rc);
1436
1437         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1438         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1439                                   lustre_swab_ost_body);
1440         if (body == NULL) {
1441                 CERROR ("Can't unpack body\n");
1442                 RETURN(-EPROTO);
1443         }
1444
1445         /* set/clear over quota flag for a uid/gid */
1446         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1447             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1448                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1449                              body->oa.o_gid, body->oa.o_valid,
1450                              body->oa.o_flags);
1451
1452         osc_update_grant(cli, body);
1453
1454         if (rc < 0)
1455                 RETURN(rc);
1456
1457         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1458                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1459
1460         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1461                 if (rc > 0) {
1462                         CERROR ("Unexpected +ve rc %d\n", rc);
1463                         RETURN(-EPROTO);
1464                 }
1465                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1466
1467                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1468                     check_write_checksum(&body->oa, peer, client_cksum,
1469                                          body->oa.o_cksum, aa->aa_requested_nob,
1470                                          aa->aa_page_count, aa->aa_ppga,
1471                                          cksum_type_unpack(aa->aa_oa->o_flags),
1472                                          aa->aa_pshift))
1473                         RETURN(-EAGAIN);
1474
1475                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1476                                      aa->aa_page_count, aa->aa_ppga);
1477                 GOTO(out, rc);
1478         }
1479
1480         /* The rest of this function executes only for OST_READs */
1481         if (rc > aa->aa_requested_nob) {
1482                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1483                        aa->aa_requested_nob);
1484                 RETURN(-EPROTO);
1485         }
1486
1487         if (rc != req->rq_bulk->bd_nob_transferred) {
1488                 CERROR ("Unexpected rc %d (%d transferred)\n",
1489                         rc, req->rq_bulk->bd_nob_transferred);
1490                 return (-EPROTO);
1491         }
1492
1493         if (rc < aa->aa_requested_nob)
1494                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1495
1496         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1497                 static int cksum_counter;
1498                 __u32      server_cksum = body->oa.o_cksum;
1499                 char      *via;
1500                 char      *router;
1501                 cksum_type_t cksum_type;
1502
1503                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1504                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1505                 else
1506                         cksum_type = OBD_CKSUM_CRC32;
1507                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1508                                                  aa->aa_ppga, OST_READ,
1509                                                  cksum_type, aa->aa_pshift);
1510
1511                 if (peer->nid == req->rq_bulk->bd_sender) {
1512                         via = router = "";
1513                 } else {
1514                         via = " via ";
1515                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1516                 }
1517
1518                 if (server_cksum == ~0 && rc > 0) {
1519                         CERROR("Protocol error: server %s set the 'checksum' "
1520                                "bit, but didn't send a checksum.  Not fatal, "
1521                                "but please notify on http://bugzilla.lustre.org/\n",
1522                                libcfs_nid2str(peer->nid));
1523                 } else if (server_cksum != client_cksum) {
1524                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1525                                            "%s%s%s inum "LPU64"/"LPU64" object "
1526                                            LPU64"/"LPU64" extent "
1527                                            "["LPU64"-"LPU64"]\n",
1528                                            req->rq_import->imp_obd->obd_name,
1529                                            libcfs_nid2str(peer->nid),
1530                                            via, router,
1531                                            body->oa.o_valid & OBD_MD_FLFID ?
1532                                                 body->oa.o_fid : (__u64)0,
1533                                            body->oa.o_valid & OBD_MD_FLFID ?
1534                                                 body->oa.o_generation :(__u64)0,
1535                                            body->oa.o_id,
1536                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1537                                                 body->oa.o_gr : (__u64)0,
1538                                            aa->aa_ppga[0]->off,
1539                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1540                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1541                                                                         1);
1542                         CERROR("client %x, server %x, cksum_type %x\n",
1543                                client_cksum, server_cksum, cksum_type);
1544                         cksum_counter = 0;
1545                         aa->aa_oa->o_cksum = client_cksum;
1546                         rc = -EAGAIN;
1547                 } else {
1548                         cksum_counter++;
1549                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1550                         rc = 0;
1551                 }
1552         } else if (unlikely(client_cksum)) {
1553                 static int cksum_missed;
1554
1555                 cksum_missed++;
1556                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1557                         CERROR("Checksum %u requested from %s but not sent\n",
1558                                cksum_missed, libcfs_nid2str(peer->nid));
1559         } else {
1560                 rc = 0;
1561         }
1562 out:
1563         if (rc >= 0)
1564                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1565
1566         RETURN(rc);
1567 }
1568
1569 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1570                             struct lov_stripe_md *lsm,
1571                             obd_count page_count, struct brw_page **pga)
1572 {
1573         struct ptlrpc_request *request;
1574         int                    rc;
1575         cfs_waitq_t            waitq;
1576         int                    resends = 0;
1577         struct l_wait_info     lwi;
1578
1579         ENTRY;
1580         init_waitqueue_head(&waitq);
1581
1582 restart_bulk:
1583         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1584                                   page_count, pga, &request, 0, resends);
1585         if (rc != 0)
1586                 return (rc);
1587
1588         rc = ptlrpc_queue_wait(request);
1589
1590         if (rc == -ETIMEDOUT && request->rq_resend) {
1591                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1592                 ptlrpc_req_finished(request);
1593                 goto restart_bulk;
1594         }
1595
1596         rc = osc_brw_fini_request(request, rc);
1597
1598         ptlrpc_req_finished(request);
1599         if (osc_recoverable_error(rc)) {
1600                 resends++;
1601                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1602                         CERROR("too many resend retries, returning error\n");
1603                         RETURN(-EIO);
1604                 }
1605
1606                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1607                 l_wait_event(waitq, 0, &lwi);
1608
1609                 goto restart_bulk;
1610         }
1611         RETURN(rc);
1612 }
1613
1614 int osc_brw_redo_request(struct ptlrpc_request *request,
1615                          struct osc_brw_async_args *aa)
1616 {
1617         struct ptlrpc_request *new_req;
1618         struct ptlrpc_request_set *set = request->rq_set;
1619         struct osc_brw_async_args *new_aa;
1620         struct osc_async_page *oap;
1621         int rc = 0;
1622         ENTRY;
1623
1624         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1625                 CERROR("too many resent retries, returning error\n");
1626                 RETURN(-EIO);
1627         }
1628
1629         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1630
1631         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1632                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1633                                   aa->aa_cli, aa->aa_oa,
1634                                   NULL /* lsm unused by osc currently */,
1635                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1636                                   aa->aa_pshift, 1);
1637         if (rc)
1638                 RETURN(rc);
1639
1640         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1641
1642         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1643                 if (oap->oap_request != NULL) {
1644                         LASSERTF(request == oap->oap_request,
1645                                  "request %p != oap_request %p\n",
1646                                  request, oap->oap_request);
1647                         if (oap->oap_interrupted) {
1648                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1649                                 ptlrpc_req_finished(new_req);
1650                                 RETURN(-EINTR);
1651                         }
1652                 }
1653         }
1654         /* New request takes over pga and oaps from old request.
1655          * Note that copying a list_head doesn't work, need to move it... */
1656         aa->aa_resends++;
1657         new_req->rq_interpret_reply = request->rq_interpret_reply;
1658         new_req->rq_async_args = request->rq_async_args;
1659         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1660
1661         new_aa = ptlrpc_req_async_args(new_req);
1662
1663         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1664         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1665         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1666
1667         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1668                 if (oap->oap_request) {
1669                         ptlrpc_req_finished(oap->oap_request);
1670                         oap->oap_request = ptlrpc_request_addref(new_req);
1671                 }
1672         }
1673
1674         /* use ptlrpc_set_add_req is safe because interpret functions work
1675          * in check_set context. only one way exist with access to request
1676          * from different thread got -EINTR - this way protected with
1677          * cl_loi_list_lock */
1678         ptlrpc_set_add_req(set, new_req);
1679
1680         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1681
1682         DEBUG_REQ(D_INFO, new_req, "new request");
1683         RETURN(0);
1684 }
1685
1686 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1687                           struct lov_stripe_md *lsm, obd_count page_count,
1688                           struct brw_page **pga, struct ptlrpc_request_set *set,
1689                           int pshift)
1690 {
1691         struct ptlrpc_request     *request;
1692         struct client_obd         *cli = &exp->exp_obd->u.cli;
1693         int                        rc, i;
1694         struct osc_brw_async_args *aa;
1695         ENTRY;
1696
1697         /* Consume write credits even if doing a sync write -
1698          * otherwise we may run out of space on OST due to grant. */
1699         /* FIXME: unaligned writes must use write grants too */
1700         if (cmd == OBD_BRW_WRITE && pshift == 0) {
1701                 client_obd_list_lock(&cli->cl_loi_list_lock);
1702                 for (i = 0; i < page_count; i++) {
1703                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1704                                 osc_consume_write_grant(cli, pga[i]);
1705                 }
1706                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1707         }
1708
1709         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1710                                   page_count, pga, &request, pshift, 0);
1711
1712         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1713
1714         if (rc == 0) {
1715                 aa = ptlrpc_req_async_args(request);
1716                 /* Do we need to separate dio stats? */
1717                 if (cmd == OBD_BRW_READ) {
1718                         lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1719                         lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1720                 } else {
1721                         lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1722                         lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1723                                          cli->cl_w_in_flight);
1724                 }
1725                 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1726
1727                 LASSERT(list_empty(&aa->aa_oaps));
1728
1729                 request->rq_interpret_reply = brw_interpret;
1730                 ptlrpc_set_add_req(set, request);
1731
1732                 client_obd_list_lock(&cli->cl_loi_list_lock);
1733                 if (cmd == OBD_BRW_READ)
1734                         cli->cl_dio_r_in_flight++;
1735                 else
1736                         cli->cl_dio_w_in_flight++;
1737                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1738
1739                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1740         } else if (cmd == OBD_BRW_WRITE) {
1741                 client_obd_list_lock(&cli->cl_loi_list_lock);
1742                 for (i = 0; i < page_count; i++)
1743                         osc_release_write_grant(cli, pga[i], 0);
1744                 osc_wake_cache_waiters(cli);
1745                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1746         }
1747
1748         RETURN (rc);
1749 }
1750
1751 /*
1752  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1753  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1754  * fine for our small page arrays and doesn't require allocation.  its an
1755  * insertion sort that swaps elements that are strides apart, shrinking the
1756  * stride down until its '1' and the array is sorted.
1757  */
1758 static void sort_brw_pages(struct brw_page **array, int num)
1759 {
1760         int stride, i, j;
1761         struct brw_page *tmp;
1762
1763         if (num == 1)
1764                 return;
1765         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1766                 ;
1767
1768         do {
1769                 stride /= 3;
1770                 for (i = stride ; i < num ; i++) {
1771                         tmp = array[i];
1772                         j = i;
1773                         while (j >= stride && array[j-stride]->off > tmp->off) {
1774                                 array[j] = array[j - stride];
1775                                 j -= stride;
1776                         }
1777                         array[j] = tmp;
1778                 }
1779         } while (stride > 1);
1780 }
1781
1782 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1783                                         int pshift)
1784 {
1785         int count = 1;
1786         int offset;
1787         int i = 0;
1788
1789         LASSERT (pages > 0);
1790         offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1791
1792         for (;;) {
1793                 pages--;
1794                 if (pages == 0)         /* that's all */
1795                         return count;
1796
1797                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1798                         return count;   /* doesn't end on page boundary */
1799
1800                 i++;
1801                 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1802                 if (offset != 0)        /* doesn't start on page boundary */
1803                         return count;
1804
1805                 count++;
1806         }
1807 }
1808
1809 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1810 {
1811         struct brw_page **ppga;
1812         int i;
1813
1814         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1815         if (ppga == NULL)
1816                 return NULL;
1817
1818         for (i = 0; i < count; i++)
1819                 ppga[i] = pga + i;
1820         return ppga;
1821 }
1822
1823 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1824 {
1825         LASSERT(ppga != NULL);
1826         OBD_FREE(ppga, sizeof(*ppga) * count);
1827 }
1828
1829 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1830                    obd_count page_count, struct brw_page *pga,
1831                    struct obd_trans_info *oti)
1832 {
1833         struct obdo *saved_oa = NULL;
1834         struct brw_page **ppga, **orig;
1835         struct obd_import *imp = class_exp2cliimp(exp);
1836         struct client_obd *cli;
1837         int rc, page_count_orig;
1838         ENTRY;
1839
1840         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1841         cli = &imp->imp_obd->u.cli;
1842
1843         if (cmd & OBD_BRW_CHECK) {
1844                 /* The caller just wants to know if there's a chance that this
1845                  * I/O can succeed */
1846
1847                 if (imp->imp_invalid)
1848                         RETURN(-EIO);
1849                 RETURN(0);
1850         }
1851
1852         /* test_brw with a failed create can trip this, maybe others. */
1853         LASSERT(cli->cl_max_pages_per_rpc);
1854
1855         rc = 0;
1856
1857         orig = ppga = osc_build_ppga(pga, page_count);
1858         if (ppga == NULL)
1859                 RETURN(-ENOMEM);
1860         page_count_orig = page_count;
1861
1862         sort_brw_pages(ppga, page_count);
1863         while (page_count) {
1864                 obd_count pages_per_brw;
1865
1866                 if (page_count > cli->cl_max_pages_per_rpc)
1867                         pages_per_brw = cli->cl_max_pages_per_rpc;
1868                 else
1869                         pages_per_brw = page_count;
1870
1871                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1872
1873                 if (saved_oa != NULL) {
1874                         /* restore previously saved oa */
1875                         *oinfo->oi_oa = *saved_oa;
1876                 } else if (page_count > pages_per_brw) {
1877                         /* save a copy of oa (brw will clobber it) */
1878                         OBDO_ALLOC(saved_oa);
1879                         if (saved_oa == NULL)
1880                                 GOTO(out, rc = -ENOMEM);
1881                         *saved_oa = *oinfo->oi_oa;
1882                 }
1883
1884                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1885                                       pages_per_brw, ppga);
1886
1887                 if (rc != 0)
1888                         break;
1889
1890                 page_count -= pages_per_brw;
1891                 ppga += pages_per_brw;
1892         }
1893
1894 out:
1895         osc_release_ppga(orig, page_count_orig);
1896
1897         if (saved_oa != NULL)
1898                 OBDO_FREE(saved_oa);
1899
1900         RETURN(rc);
1901 }
1902
1903 static int osc_brw_async(int cmd, struct obd_export *exp,
1904                          struct obd_info *oinfo, obd_count page_count,
1905                          struct brw_page *pga, struct obd_trans_info *oti,
1906                          struct ptlrpc_request_set *set, int pshift)
1907 {
1908         struct brw_page **ppga, **orig;
1909         int page_count_orig;
1910         int rc = 0;
1911         ENTRY;
1912
1913         if (cmd & OBD_BRW_CHECK) {
1914                 /* The caller just wants to know if there's a chance that this
1915                  * I/O can succeed */
1916                 struct obd_import *imp = class_exp2cliimp(exp);
1917
1918                 if (imp == NULL || imp->imp_invalid)
1919                         RETURN(-EIO);
1920                 RETURN(0);
1921         }
1922
1923         orig = ppga = osc_build_ppga(pga, page_count);
1924         if (ppga == NULL)
1925                 RETURN(-ENOMEM);
1926         page_count_orig = page_count;
1927
1928         sort_brw_pages(ppga, page_count);
1929         while (page_count) {
1930                 struct brw_page **copy;
1931                 struct obdo *oa;
1932                 obd_count pages_per_brw;
1933
1934                 /* one page less under unaligned direct i/o */
1935                 pages_per_brw = min_t(obd_count, page_count,
1936                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1937                                       !!pshift);
1938
1939                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1940                                                        pshift);
1941
1942                 /* use ppga only if single RPC is going to fly */
1943                 if (pages_per_brw != page_count_orig || ppga != orig) {
1944                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1945                         if (copy == NULL)
1946                                 GOTO(out, rc = -ENOMEM);
1947                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1948
1949                         OBDO_ALLOC(oa);
1950                         if (oa == NULL) {
1951                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1952                                 GOTO(out, rc = -ENOMEM);
1953                         }
1954                         memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1955                         if (oa->o_valid & OBD_MD_FLFLAGS) {
1956                                 oa->o_flags |= OBD_FL_TEMPORARY;
1957                         } else {
1958                                 oa->o_valid |= OBD_MD_FLFLAGS;
1959                                 oa->o_flags = OBD_FL_TEMPORARY;
1960                         }
1961                 } else {
1962                         copy = ppga;
1963                         oa = oinfo->oi_oa;
1964                         LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1965                 }
1966
1967                 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1968                                     copy, set, pshift);
1969
1970                 if (rc != 0) {
1971                         if (copy != ppga)
1972                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1973
1974                         if (oa->o_valid & OBD_MD_FLFLAGS &&
1975                             oa->o_flags & OBD_FL_TEMPORARY)
1976                                 OBDO_FREE(oa);
1977                         break;
1978                 }
1979
1980                 if (copy == orig) {
1981                         /* we passed it to async_internal() which is
1982                          * now responsible for releasing memory */
1983                         orig = NULL;
1984                 }
1985
1986                 page_count -= pages_per_brw;
1987                 ppga += pages_per_brw;
1988         }
1989 out:
1990         if (orig)
1991                 osc_release_ppga(orig, page_count_orig);
1992         RETURN(rc);
1993 }
1994
1995 static void osc_check_rpcs(struct client_obd *cli);
1996
1997 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1998  * the dirty accounting.  Writeback completes or truncate happens before
1999  * writing starts.  Must be called with the loi lock held. */
2000 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
2001                            int sent)
2002 {
2003         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
2004 }
2005
2006 /* This maintains the lists of pending pages to read/write for a given object
2007  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
2008  * to quickly find objects that are ready to send an RPC. */
2009 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
2010                          int cmd)
2011 {
2012         int optimal;
2013         ENTRY;
2014
2015         if (lop->lop_num_pending == 0)
2016                 RETURN(0);
2017
2018         /* if we have an invalid import we want to drain the queued pages
2019          * by forcing them through rpcs that immediately fail and complete
2020          * the pages.  recovery relies on this to empty the queued pages
2021          * before canceling the locks and evicting down the llite pages */
2022         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2023                 RETURN(1);
2024
2025         /* stream rpcs in queue order as long as as there is an urgent page
2026          * queued.  this is our cheap solution for good batching in the case
2027          * where writepage marks some random page in the middle of the file
2028          * as urgent because of, say, memory pressure */
2029         if (!list_empty(&lop->lop_urgent)) {
2030                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2031                 RETURN(1);
2032         }
2033
2034         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2035         optimal = cli->cl_max_pages_per_rpc;
2036         if (cmd & OBD_BRW_WRITE) {
2037                 /* trigger a write rpc stream as long as there are dirtiers
2038                  * waiting for space.  as they're waiting, they're not going to
2039                  * create more pages to coallesce with what's waiting.. */
2040                 if (!list_empty(&cli->cl_cache_waiters)) {
2041                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2042                         RETURN(1);
2043                 }
2044
2045                 /* +16 to avoid triggering rpcs that would want to include pages
2046                  * that are being queued but which can't be made ready until
2047                  * the queuer finishes with the page. this is a wart for
2048                  * llite::commit_write() */
2049                 optimal += 16;
2050         }
2051         if (lop->lop_num_pending >= optimal)
2052                 RETURN(1);
2053
2054         RETURN(0);
2055 }
2056
2057 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2058 {
2059         struct osc_async_page *oap;
2060         ENTRY;
2061
2062         if (list_empty(&lop->lop_urgent))
2063                 RETURN(0);
2064
2065         oap = list_entry(lop->lop_urgent.next,
2066                          struct osc_async_page, oap_urgent_item);
2067
2068         if (oap->oap_async_flags & ASYNC_HP) {
2069                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2070                 RETURN(1);
2071         }
2072
2073         RETURN(0);
2074 }
2075
2076 static void on_list(struct list_head *item, struct list_head *list,
2077                     int should_be_on)
2078 {
2079         if (list_empty(item) && should_be_on)
2080                 list_add_tail(item, list);
2081         else if (!list_empty(item) && !should_be_on)
2082                 list_del_init(item);
2083 }
2084
2085 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2086  * can find pages to build into rpcs quickly */
2087 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2088 {
2089         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2090             lop_makes_hprpc(&loi->loi_read_lop)) {
2091                 /* HP rpc */
2092                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2093                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2094         } else {
2095                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2096                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2097                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2098                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2099         }
2100
2101         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2102                 loi->loi_write_lop.lop_num_pending);
2103
2104         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2105                 loi->loi_read_lop.lop_num_pending);
2106 }
2107
2108 static void lop_update_pending(struct client_obd *cli,
2109                                struct loi_oap_pages *lop, int cmd, int delta)
2110 {
2111         lop->lop_num_pending += delta;
2112         if (cmd & OBD_BRW_WRITE)
2113                 cli->cl_pending_w_pages += delta;
2114         else
2115                 cli->cl_pending_r_pages += delta;
2116 }
2117
2118 /* this is called when a sync waiter receives an interruption.  Its job is to
2119  * get the caller woken as soon as possible.  If its page hasn't been put in an
2120  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2121  * desiring interruption which will forcefully complete the rpc once the rpc
2122  * has timed out */
2123 static void osc_occ_interrupted(struct oig_callback_context *occ)
2124 {
2125         struct osc_async_page *oap;
2126         struct loi_oap_pages *lop;
2127         struct lov_oinfo *loi;
2128         ENTRY;
2129
2130         /* XXX member_of() */
2131         oap = list_entry(occ, struct osc_async_page, oap_occ);
2132
2133         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2134
2135         oap->oap_interrupted = 1;
2136
2137         /* ok, it's been put in an rpc. only one oap gets a request reference */
2138         if (oap->oap_request != NULL) {
2139                 ptlrpc_mark_interrupted(oap->oap_request);
2140                 ptlrpcd_wake(oap->oap_request);
2141                 GOTO(unlock, 0);
2142         }
2143
2144         /* we don't get interruption callbacks until osc_trigger_group_io()
2145          * has been called and put the sync oaps in the pending/urgent lists.*/
2146         if (!list_empty(&oap->oap_pending_item)) {
2147                 list_del_init(&oap->oap_pending_item);
2148                 list_del_init(&oap->oap_urgent_item);
2149
2150                 loi = oap->oap_loi;
2151                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2152                         &loi->loi_write_lop : &loi->loi_read_lop;
2153                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2154                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2155
2156                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2157                 oap->oap_oig = NULL;
2158         }
2159
2160 unlock:
2161         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2162 }
2163
2164 /* this is trying to propogate async writeback errors back up to the
2165  * application.  As an async write fails we record the error code for later if
2166  * the app does an fsync.  As long as errors persist we force future rpcs to be
2167  * sync so that the app can get a sync error and break the cycle of queueing
2168  * pages for which writeback will fail. */
2169 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2170                            int rc)
2171 {
2172         if (rc) {
2173                 if (!ar->ar_rc)
2174                         ar->ar_rc = rc;
2175
2176                 ar->ar_force_sync = 1;
2177                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2178                 return;
2179
2180         }
2181
2182         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2183                 ar->ar_force_sync = 0;
2184 }
2185
2186 static void osc_oap_to_pending(struct osc_async_page *oap)
2187 {
2188         struct loi_oap_pages *lop;
2189
2190         if (oap->oap_cmd & OBD_BRW_WRITE)
2191                 lop = &oap->oap_loi->loi_write_lop;
2192         else
2193                 lop = &oap->oap_loi->loi_read_lop;
2194
2195         if (oap->oap_async_flags & ASYNC_HP)
2196                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2197         else if (oap->oap_async_flags & ASYNC_URGENT)
2198                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2199         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2200         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2201 }
2202
2203 /* this must be called holding the loi list lock to give coverage to exit_cache,
2204  * async_flag maintenance, and oap_request */
2205 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2206                               struct osc_async_page *oap, int sent, int rc)
2207 {
2208         __u64 xid = 0;
2209
2210         ENTRY;
2211         if (oap->oap_request != NULL) {
2212                 xid = ptlrpc_req_xid(oap->oap_request);
2213                 ptlrpc_req_finished(oap->oap_request);
2214                 oap->oap_request = NULL;
2215         }
2216
2217         spin_lock(&oap->oap_lock);
2218         oap->oap_async_flags = 0;
2219         spin_unlock(&oap->oap_lock);
2220         oap->oap_interrupted = 0;
2221
2222         if (oap->oap_cmd & OBD_BRW_WRITE) {
2223                 osc_process_ar(&cli->cl_ar, xid, rc);
2224                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2225         }
2226
2227         if (rc == 0 && oa != NULL) {
2228                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2229                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2230                 if (oa->o_valid & OBD_MD_FLMTIME)
2231                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2232                 if (oa->o_valid & OBD_MD_FLATIME)
2233                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2234                 if (oa->o_valid & OBD_MD_FLCTIME)
2235                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2236         }
2237
2238         if (oap->oap_oig) {
2239                 osc_exit_cache(cli, oap, sent);
2240                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2241                 oap->oap_oig = NULL;
2242                 EXIT;
2243                 return;
2244         }
2245
2246         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2247                                                 oap->oap_cmd, oa, rc);
2248
2249         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2250          * I/O on the page could start, but OSC calls it under lock
2251          * and thus we can add oap back to pending safely */
2252         if (rc)
2253                 /* upper layer wants to leave the page on pending queue */
2254                 osc_oap_to_pending(oap);
2255         else
2256                 osc_exit_cache(cli, oap, sent);
2257         EXIT;
2258 }
2259
2260 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2261 {
2262         struct osc_brw_async_args *aa = data;
2263         struct client_obd *cli;
2264         ENTRY;
2265
2266         rc = osc_brw_fini_request(request, rc);
2267         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2268
2269         if (osc_recoverable_error(rc)) {
2270                 /* Only retry once for mmaped files since the mmaped page
2271                  * might be modified at anytime. We have to retry at least
2272                  * once in case there WAS really a corruption of the page
2273                  * on the network, that was not caused by mmap() modifying
2274                  * the page. bug 11742 */
2275                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2276                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2277                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2278                         rc = 0;
2279                 } else {
2280                         rc = osc_brw_redo_request(request, aa);
2281                         if (rc == 0)
2282                                 RETURN(0);
2283                 }
2284         }
2285
2286         cli = aa->aa_cli;
2287         client_obd_list_lock(&cli->cl_loi_list_lock);
2288         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2289                 struct osc_async_page *oap, *tmp;
2290
2291                 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2292                  * is called so we know whether to go to sync BRWs or wait for more
2293                  * RPCs to complete */
2294                 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2295                         cli->cl_w_in_flight--;
2296                 else
2297                         cli->cl_r_in_flight--;
2298
2299                 /* the caller may re-use the oap after the completion call so
2300                  * we need to clean it up a little */
2301                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2302                         list_del_init(&oap->oap_rpc_item);
2303                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2304                 }
2305                 OBDO_FREE(aa->aa_oa);
2306         } else { /* from async_internal() */
2307                 obd_count i;
2308                 for (i = 0; i < aa->aa_page_count; i++)
2309                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2310
2311                 if (aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2312                     aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2313                         OBDO_FREE(aa->aa_oa);
2314
2315                 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2316                         cli->cl_dio_w_in_flight--;
2317                 else
2318                         cli->cl_dio_r_in_flight--;
2319         }
2320         osc_wake_cache_waiters(cli);
2321         osc_check_rpcs(cli);
2322         client_obd_list_unlock(&cli->cl_loi_list_lock);
2323
2324         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2325
2326         RETURN(rc);
2327 }
2328
2329 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2330                                             struct list_head *rpc_list,
2331                                             int page_count, int cmd)
2332 {
2333         struct ptlrpc_request *req;
2334         struct brw_page **pga = NULL;
2335         struct osc_brw_async_args *aa;
2336         struct obdo *oa = NULL;
2337         struct obd_async_page_ops *ops = NULL;
2338         void *caller_data = NULL;
2339         struct osc_async_page *oap;
2340         struct ldlm_lock *lock = NULL;
2341         obd_valid valid;
2342         int i, rc;
2343
2344         ENTRY;
2345         LASSERT(!list_empty(rpc_list));
2346
2347         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2348         if (pga == NULL)
2349                 RETURN(ERR_PTR(-ENOMEM));
2350
2351         OBDO_ALLOC(oa);
2352         if (oa == NULL)
2353                 GOTO(out, req = ERR_PTR(-ENOMEM));
2354
2355         i = 0;
2356         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2357                 if (ops == NULL) {
2358                         ops = oap->oap_caller_ops;
2359                         caller_data = oap->oap_caller_data;
2360                         lock = oap->oap_ldlm_lock;
2361                 }
2362                 pga[i] = &oap->oap_brw_page;
2363                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2364                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2365                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2366                 i++;
2367         }
2368
2369         /* always get the data for the obdo for the rpc */
2370         LASSERT(ops != NULL);
2371         ops->ap_fill_obdo(caller_data, cmd, oa);
2372         if (lock) {
2373                 oa->o_handle = lock->l_remote_handle;
2374                 oa->o_valid |= OBD_MD_FLHANDLE;
2375         }
2376
2377         sort_brw_pages(pga, page_count);
2378         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0,
2379                                   0);
2380         if (rc != 0) {
2381                 CERROR("prep_req failed: %d\n", rc);
2382                 GOTO(out, req = ERR_PTR(rc));
2383         }
2384         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2385                                                  sizeof(struct ost_body)))->oa;
2386
2387         /* Need to update the timestamps after the request is built in case
2388          * we race with setattr (locally or in queue at OST).  If OST gets
2389          * later setattr before earlier BRW (as determined by the request xid),
2390          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2391          * way to do this in a single call.  bug 10150 */
2392         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2393                 /* in case of lockless read/write do not use inode's
2394                  * timestamps because concurrent stat might fill the
2395                  * inode with out-of-date times, send current
2396                  * instead */
2397                 if (cmd & OBD_BRW_WRITE) {
2398                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2399                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2400                         valid = OBD_MD_FLATIME;
2401                 } else {
2402                         oa->o_atime = LTIME_S(CURRENT_TIME);
2403                         oa->o_valid |= OBD_MD_FLATIME;
2404                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2405                 }
2406         } else {
2407                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2408         }
2409         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2410
2411         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2412         aa = ptlrpc_req_async_args(req);
2413         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2414         list_splice(rpc_list, &aa->aa_oaps);
2415         CFS_INIT_LIST_HEAD(rpc_list);
2416
2417 out:
2418         if (IS_ERR(req)) {
2419                 if (oa)
2420                         OBDO_FREE(oa);
2421                 if (pga)
2422                         OBD_FREE(pga, sizeof(*pga) * page_count);
2423         }
2424         RETURN(req);
2425 }
2426
2427 /* the loi lock is held across this function but it's allowed to release
2428  * and reacquire it during its work */
2429 /**
2430  * prepare pages for ASYNC io and put pages in send queue.
2431  *
2432  * \param cli -
2433  * \param loi -
2434  * \param cmd - OBD_BRW_* macroses
2435  * \param lop - pending pages
2436  *
2437  * \return zero if pages successfully add to send queue.
2438  * \return not zere if error occurring.
2439  */
2440 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2441                             int cmd, struct loi_oap_pages *lop)
2442 {
2443         struct ptlrpc_request *req;
2444         obd_count page_count = 0;
2445         struct osc_async_page *oap = NULL, *tmp;
2446         struct osc_brw_async_args *aa;
2447         struct obd_async_page_ops *ops;
2448         CFS_LIST_HEAD(rpc_list);
2449         unsigned int ending_offset;
2450         unsigned  starting_offset = 0;
2451         int srvlock = 0;
2452         ENTRY;
2453
2454         /* If there are HP OAPs we need to handle at least 1 of them,
2455          * move it the beginning of the pending list for that. */
2456         if (!list_empty(&lop->lop_urgent)) {
2457                 oap = list_entry(lop->lop_urgent.next,
2458                                  struct osc_async_page, oap_urgent_item);
2459                 if (oap->oap_async_flags & ASYNC_HP)
2460                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2461         }
2462
2463         /* first we find the pages we're allowed to work with */
2464         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2465                 ops = oap->oap_caller_ops;
2466
2467                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2468                          "magic 0x%x\n", oap, oap->oap_magic);
2469
2470                 if (page_count != 0 &&
2471                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2472                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2473                                " oap %p, page %p, srvlock %u\n",
2474                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2475                         break;
2476                 }
2477                 /* in llite being 'ready' equates to the page being locked
2478                  * until completion unlocks it.  commit_write submits a page
2479                  * as not ready because its unlock will happen unconditionally
2480                  * as the call returns.  if we race with commit_write giving
2481                  * us that page we dont' want to create a hole in the page
2482                  * stream, so we stop and leave the rpc to be fired by
2483                  * another dirtier or kupdated interval (the not ready page
2484                  * will still be on the dirty list).  we could call in
2485                  * at the end of ll_file_write to process the queue again. */
2486                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2487                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2488                         if (rc < 0)
2489                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2490                                                 "instead of ready\n", oap,
2491                                                 oap->oap_page, rc);
2492                         switch (rc) {
2493                         case -EAGAIN:
2494                                 /* llite is telling us that the page is still
2495                                  * in commit_write and that we should try
2496                                  * and put it in an rpc again later.  we
2497                                  * break out of the loop so we don't create
2498                                  * a hole in the sequence of pages in the rpc
2499                                  * stream.*/
2500                                 oap = NULL;
2501                                 break;
2502                         case -EINTR:
2503                                 /* the io isn't needed.. tell the checks
2504                                  * below to complete the rpc with EINTR */
2505                                 spin_lock(&oap->oap_lock);
2506                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2507                                 spin_unlock(&oap->oap_lock);
2508                                 oap->oap_count = -EINTR;
2509                                 break;
2510                         case 0:
2511                                 spin_lock(&oap->oap_lock);
2512                                 oap->oap_async_flags |= ASYNC_READY;
2513                                 spin_unlock(&oap->oap_lock);
2514                                 break;
2515                         default:
2516                                 LASSERTF(0, "oap %p page %p returned %d "
2517                                             "from make_ready\n", oap,
2518                                             oap->oap_page, rc);
2519                                 break;
2520                         }
2521                 }
2522                 if (oap == NULL)
2523                         break;
2524                 /*
2525                  * Page submitted for IO has to be locked. Either by
2526                  * ->ap_make_ready() or by higher layers.
2527                  */
2528 #if defined(__KERNEL__) && defined(__linux__)
2529                  if(!(PageLocked(oap->oap_page) &&
2530                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2531                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2532                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2533                         LBUG();
2534                 }
2535 #endif
2536                 /* If there is a gap at the start of this page, it can't merge
2537                  * with any previous page, so we'll hand the network a
2538                  * "fragmented" page array that it can't transfer in 1 RDMA */
2539                 if (page_count != 0 && oap->oap_page_off != 0)
2540                         break;
2541
2542                 /* take the page out of our book-keeping */
2543                 list_del_init(&oap->oap_pending_item);
2544                 lop_update_pending(cli, lop, cmd, -1);
2545                 list_del_init(&oap->oap_urgent_item);
2546
2547                 if (page_count == 0)
2548                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2549                                           (PTLRPC_MAX_BRW_SIZE - 1);
2550
2551                 /* ask the caller for the size of the io as the rpc leaves. */
2552                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2553                         oap->oap_count =
2554                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2555                 if (oap->oap_count <= 0) {
2556                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2557                                oap->oap_count);
2558                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2559                         continue;
2560                 }
2561
2562                 /* now put the page back in our accounting */
2563                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2564                 if (page_count == 0)
2565                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2566                 if (++page_count >= cli->cl_max_pages_per_rpc)
2567                         break;
2568
2569                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2570                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2571                  * have the same alignment as the initial writes that allocated
2572                  * extents on the server. */
2573                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2574                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2575                 if (ending_offset == 0)
2576                         break;
2577
2578                 /* If there is a gap at the end of this page, it can't merge
2579                  * with any subsequent pages, so we'll hand the network a
2580                  * "fragmented" page array that it can't transfer in 1 RDMA */
2581                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2582                         break;
2583         }
2584
2585         osc_wake_cache_waiters(cli);
2586
2587         if (page_count == 0)
2588                 RETURN(0);
2589
2590         loi_list_maint(cli, loi);
2591
2592         client_obd_list_unlock(&cli->cl_loi_list_lock);
2593
2594         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2595         if (IS_ERR(req)) {
2596                 /* this should happen rarely and is pretty bad, it makes the
2597                  * pending list not follow the dirty order */
2598                 client_obd_list_lock(&cli->cl_loi_list_lock);
2599                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2600                         list_del_init(&oap->oap_rpc_item);
2601
2602                         /* queued sync pages can be torn down while the pages
2603                          * were between the pending list and the rpc */
2604                         if (oap->oap_interrupted) {
2605                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2606                                 osc_ap_completion(cli, NULL, oap, 0,
2607                                                   oap->oap_count);
2608                                 continue;
2609                         }
2610                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2611                 }
2612                 loi_list_maint(cli, loi);
2613                 RETURN(PTR_ERR(req));
2614         }
2615
2616         aa = ptlrpc_req_async_args(req);
2617         if (cmd == OBD_BRW_READ) {
2618                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2619                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2620                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2621                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2622         } else {
2623                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2624                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2625                                  cli->cl_w_in_flight);
2626                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2627                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2628         }
2629         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2630
2631         client_obd_list_lock(&cli->cl_loi_list_lock);
2632
2633         if (cmd == OBD_BRW_READ)
2634                 cli->cl_r_in_flight++;
2635         else
2636                 cli->cl_w_in_flight++;
2637
2638         /* queued sync pages can be torn down while the pages
2639          * were between the pending list and the rpc */
2640         tmp = NULL;
2641         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2642                 /* only one oap gets a request reference */
2643                 if (tmp == NULL)
2644                         tmp = oap;
2645                 if (oap->oap_interrupted && !req->rq_intr) {
2646                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2647                                oap, req);
2648                         ptlrpc_mark_interrupted(req);
2649                 }
2650         }
2651         if (tmp != NULL)
2652                 tmp->oap_request = ptlrpc_request_addref(req);
2653
2654         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2655                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2656
2657         req->rq_interpret_reply = brw_interpret;
2658         ptlrpcd_add_req(req);
2659         RETURN(1);
2660 }
2661
2662 #define LOI_DEBUG(LOI, STR, args...)                                     \
2663         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2664                !list_empty(&(LOI)->loi_ready_item) ||                    \
2665                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2666                (LOI)->loi_write_lop.lop_num_pending,                     \
2667                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2668                (LOI)->loi_read_lop.lop_num_pending,                      \
2669                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2670                args)                                                     \
2671
2672 /* This is called by osc_check_rpcs() to find which objects have pages that
2673  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2674 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2675 {
2676         ENTRY;
2677         /* First return objects that have blocked locks so that they
2678          * will be flushed quickly and other clients can get the lock,
2679          * then objects which have pages ready to be stuffed into RPCs */
2680         if (!list_empty(&cli->cl_loi_hp_ready_list))
2681                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2682                                   struct lov_oinfo, loi_hp_ready_item));
2683         if (!list_empty(&cli->cl_loi_ready_list))
2684                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2685                                   struct lov_oinfo, loi_ready_item));
2686
2687         /* then if we have cache waiters, return all objects with queued
2688          * writes.  This is especially important when many small files
2689          * have filled up the cache and not been fired into rpcs because
2690          * they don't pass the nr_pending/object threshhold */
2691         if (!list_empty(&cli->cl_cache_waiters) &&
2692             !list_empty(&cli->cl_loi_write_list))
2693                 RETURN(list_entry(cli->cl_loi_write_list.next,
2694                                   struct lov_oinfo, loi_write_item));
2695
2696         /* then return all queued objects when we have an invalid import
2697          * so that they get flushed */
2698         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2699                 if (!list_empty(&cli->cl_loi_write_list))
2700                         RETURN(list_entry(cli->cl_loi_write_list.next,
2701                                           struct lov_oinfo, loi_write_item));
2702                 if (!list_empty(&cli->cl_loi_read_list))
2703                         RETURN(list_entry(cli->cl_loi_read_list.next,
2704                                           struct lov_oinfo, loi_read_item));
2705         }
2706         RETURN(NULL);
2707 }
2708
2709 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2710 {
2711         struct osc_async_page *oap;
2712         int hprpc = 0;
2713
2714         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2715                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2716                                  struct osc_async_page, oap_urgent_item);
2717                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2718         }
2719
2720         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2721                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2722                                  struct osc_async_page, oap_urgent_item);
2723                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2724         }
2725
2726         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2727 }
2728
2729 /* called with the loi list lock held */
2730 static void osc_check_rpcs(struct client_obd *cli)
2731 {
2732         struct lov_oinfo *loi;
2733         int rc = 0, race_counter = 0;
2734         ENTRY;
2735
2736         while ((loi = osc_next_loi(cli)) != NULL) {
2737                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2738
2739                 if (osc_max_rpc_in_flight(cli, loi))
2740                         break;
2741
2742                 /* attempt some read/write balancing by alternating between
2743                  * reads and writes in an object.  The makes_rpc checks here
2744                  * would be redundant if we were getting read/write work items
2745                  * instead of objects.  we don't want send_oap_rpc to drain a
2746                  * partial read pending queue when we're given this object to
2747                  * do io on writes while there are cache waiters */
2748                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2749                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2750                                               &loi->loi_write_lop);
2751                         if (rc < 0)
2752                                 break;
2753                         if (rc > 0)
2754                                 race_counter = 0;
2755                         else
2756                                 race_counter++;
2757                 }
2758                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2759                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2760                                               &loi->loi_read_lop);
2761                         if (rc < 0)
2762                                 break;
2763                         if (rc > 0)
2764                                 race_counter = 0;
2765                         else
2766                                 race_counter++;
2767                 }
2768
2769                 /* attempt some inter-object balancing by issueing rpcs
2770                  * for each object in turn */
2771                 if (!list_empty(&loi->loi_hp_ready_item))
2772                         list_del_init(&loi->loi_hp_ready_item);
2773                 if (!list_empty(&loi->loi_ready_item))
2774                         list_del_init(&loi->loi_ready_item);
2775                 if (!list_empty(&loi->loi_write_item))
2776                         list_del_init(&loi->loi_write_item);
2777                 if (!list_empty(&loi->loi_read_item))
2778                         list_del_init(&loi->loi_read_item);
2779
2780                 loi_list_maint(cli, loi);
2781
2782                 /* send_oap_rpc fails with 0 when make_ready tells it to
2783                  * back off.  llite's make_ready does this when it tries
2784                  * to lock a page queued for write that is already locked.
2785                  * we want to try sending rpcs from many objects, but we
2786                  * don't want to spin failing with 0.  */
2787                 if (race_counter == 10)
2788                         break;
2789         }
2790         EXIT;
2791 }
2792
2793 /* we're trying to queue a page in the osc so we're subject to the
2794  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2795  * If the osc's queued pages are already at that limit, then we want to sleep
2796  * until there is space in the osc's queue for us.  We also may be waiting for
2797  * write credits from the OST if there are RPCs in flight that may return some
2798  * before we fall back to sync writes.
2799  *
2800  * We need this know our allocation was granted in the presence of signals */
2801 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2802 {
2803         int rc;
2804         ENTRY;
2805         client_obd_list_lock(&cli->cl_loi_list_lock);
2806         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2807         client_obd_list_unlock(&cli->cl_loi_list_lock);
2808         RETURN(rc);
2809 };
2810
2811 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2812  * grant or cache space. */
2813 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2814                            struct osc_async_page *oap)
2815 {
2816         struct osc_cache_waiter ocw;
2817         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2818         ENTRY;
2819
2820         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2821                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2822                cli->cl_dirty_max, obd_max_dirty_pages,
2823                cli->cl_lost_grant, cli->cl_avail_grant);
2824
2825         /* force the caller to try sync io.  this can jump the list
2826          * of queued writes and create a discontiguous rpc stream */
2827         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2828             loi->loi_ar.ar_force_sync)
2829                 RETURN(-EDQUOT);
2830
2831         /* Hopefully normal case - cache space and write credits available */
2832         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2833             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2834             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2835                 /* account for ourselves */
2836                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2837                 RETURN(0);
2838         }
2839
2840         /* It is safe to block as a cache waiter as long as there is grant
2841          * space available or the hope of additional grant being returned
2842          * when an in flight write completes.  Using the write back cache
2843          * if possible is preferable to sending the data synchronously
2844          * because write pages can then be merged in to large requests.
2845          * The addition of this cache waiter will causing pending write
2846          * pages to be sent immediately. */
2847         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2848                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2849                 cfs_waitq_init(&ocw.ocw_waitq);
2850                 ocw.ocw_oap = oap;
2851                 ocw.ocw_rc = 0;
2852
2853                 loi_list_maint(cli, loi);
2854                 osc_check_rpcs(cli);
2855                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2856
2857                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2858                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2859
2860                 client_obd_list_lock(&cli->cl_loi_list_lock);
2861                 if (!list_empty(&ocw.ocw_entry)) {
2862                         list_del(&ocw.ocw_entry);
2863                         RETURN(-EINTR);
2864                 }
2865                 RETURN(ocw.ocw_rc);
2866         }
2867
2868         RETURN(-EDQUOT);
2869 }
2870
2871 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2872                         void **res, int rw, obd_off start, obd_off end,
2873                         struct lustre_handle *lockh, int flags)
2874 {
2875         struct ldlm_lock *lock = NULL;
2876         int rc, release = 0;
2877
2878         ENTRY;
2879
2880         if (lockh && lustre_handle_is_used(lockh)) {
2881                 /* if a valid lockh is passed, just check that the corresponding
2882                  * lock covers the extent */
2883                 lock = ldlm_handle2lock(lockh);
2884                 release = 1;
2885         } else {
2886                 struct osc_async_page *oap = *res;
2887                 spin_lock(&oap->oap_lock);
2888                 lock = oap->oap_ldlm_lock;
2889                 if (likely(lock))
2890                         LDLM_LOCK_GET(lock);
2891                 spin_unlock(&oap->oap_lock);
2892         }
2893         /* lock can be NULL in case race obd_get_lock vs lock cancel
2894          * so we should be don't try match this */
2895         if (unlikely(!lock))
2896                 return 0;
2897
2898         rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2899         if (release == 1 && rc == 1)
2900                 /* if a valid lockh was passed, we just need to check
2901                  * that the lock covers the page, no reference should be
2902                  * taken*/
2903                 ldlm_lock_decref(lockh,
2904                                  rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2905         LDLM_LOCK_PUT(lock);
2906         RETURN(rc);
2907 }
2908
2909 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2910                         struct lov_oinfo *loi, cfs_page_t *page,
2911                         obd_off offset, struct obd_async_page_ops *ops,
2912                         void *data, void **res, int flags,
2913                         struct lustre_handle *lockh)
2914 {
2915         struct osc_async_page *oap;
2916         struct ldlm_res_id oid = {{0}};
2917         int rc = 0;
2918
2919         ENTRY;
2920
2921         if (!page)
2922                 return size_round(sizeof(*oap));
2923
2924         oap = *res;
2925         oap->oap_magic = OAP_MAGIC;
2926         oap->oap_cli = &exp->exp_obd->u.cli;
2927         oap->oap_loi = loi;
2928
2929         oap->oap_caller_ops = ops;
2930         oap->oap_caller_data = data;
2931
2932         oap->oap_page = page;
2933         oap->oap_obj_off = offset;
2934
2935         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2936         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2937         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2938         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2939
2940         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2941
2942         spin_lock_init(&oap->oap_lock);
2943
2944         /* If the page was marked as notcacheable - don't add to any locks */
2945         if (!(flags & OBD_PAGE_NO_CACHE)) {
2946                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2947                 /* This is the only place where we can call cache_add_extent
2948                    without oap_lock, because this page is locked now, and
2949                    the lock we are adding it to is referenced, so cannot lose
2950                    any pages either. */
2951                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2952                 if (rc)
2953                         RETURN(rc);
2954         }
2955
2956         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2957         RETURN(0);
2958 }
2959
2960 struct osc_async_page *oap_from_cookie(void *cookie)
2961 {
2962         struct osc_async_page *oap = cookie;
2963         if (oap->oap_magic != OAP_MAGIC)
2964                 return ERR_PTR(-EINVAL);
2965         return oap;
2966 };
2967
2968 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2969                               struct lov_oinfo *loi, void *cookie,
2970                               int cmd, obd_off off, int count,
2971                               obd_flag brw_flags, enum async_flags async_flags)
2972 {
2973         struct client_obd *cli = &exp->exp_obd->u.cli;
2974         struct osc_async_page *oap;
2975         int rc = 0;
2976         ENTRY;
2977
2978         oap = oap_from_cookie(cookie);
2979         if (IS_ERR(oap))
2980                 RETURN(PTR_ERR(oap));
2981
2982         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2983                 RETURN(-EIO);
2984
2985         if (!list_empty(&oap->oap_pending_item) ||
2986             !list_empty(&oap->oap_urgent_item) ||
2987             !list_empty(&oap->oap_rpc_item))
2988                 RETURN(-EBUSY);
2989
2990         /* check if the file's owner/group is over quota */
2991         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2992                 struct obd_async_page_ops *ops;
2993                 struct obdo *oa;
2994
2995                 OBDO_ALLOC(oa);
2996                 if (oa == NULL)
2997                         RETURN(-ENOMEM);
2998
2999                 ops = oap->oap_caller_ops;
3000                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
3001                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
3002                     NO_QUOTA)
3003                         rc = -EDQUOT;
3004
3005                 OBDO_FREE(oa);
3006                 if (rc)
3007                         RETURN(rc);
3008         }
3009
3010         if (loi == NULL)
3011                 loi = lsm->lsm_oinfo[0];
3012
3013         client_obd_list_lock(&cli->cl_loi_list_lock);
3014
3015         oap->oap_cmd = cmd;
3016         oap->oap_page_off = off;
3017         oap->oap_count = count;
3018         oap->oap_brw_flags = brw_flags;
3019         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3020         if (libcfs_memory_pressure_get())
3021                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3022         spin_lock(&oap->oap_lock);
3023         oap->oap_async_flags = async_flags;
3024         spin_unlock(&oap->oap_lock);
3025
3026         if (cmd & OBD_BRW_WRITE) {
3027                 rc = osc_enter_cache(cli, loi, oap);
3028                 if (rc) {
3029                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3030                         RETURN(rc);
3031                 }
3032         }
3033
3034         osc_oap_to_pending(oap);
3035         loi_list_maint(cli, loi);
3036
3037         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3038                   cmd);
3039
3040         osc_check_rpcs(cli);
3041         client_obd_list_unlock(&cli->cl_loi_list_lock);
3042
3043         RETURN(0);
3044 }
3045
3046 /* aka (~was & now & flag), but this is more clear :) */
3047 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3048
3049 static int osc_set_async_flags(struct obd_export *exp,
3050                                struct lov_stripe_md *lsm,
3051                                struct lov_oinfo *loi, void *cookie,
3052                                obd_flag async_flags)
3053 {
3054         struct client_obd *cli = &exp->exp_obd->u.cli;
3055         struct loi_oap_pages *lop;
3056         struct osc_async_page *oap;
3057         int rc = 0;
3058         ENTRY;
3059
3060         oap = oap_from_cookie(cookie);
3061         if (IS_ERR(oap))
3062                 RETURN(PTR_ERR(oap));
3063
3064         /*
3065          * bug 7311: OST-side locking is only supported for liblustre for now
3066          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
3067          * implementation has to handle case where OST-locked page was picked
3068          * up by, e.g., ->writepage().
3069          */
3070         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
3071         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3072                                      * tread here. */
3073
3074         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3075                 RETURN(-EIO);
3076
3077         if (loi == NULL)
3078                 loi = lsm->lsm_oinfo[0];
3079
3080         if (oap->oap_cmd & OBD_BRW_WRITE) {
3081                 lop = &loi->loi_write_lop;
3082         } else {
3083                 lop = &loi->loi_read_lop;
3084         }
3085
3086         client_obd_list_lock(&cli->cl_loi_list_lock);
3087         /* oap_lock provides atomic semantics of oap_async_flags access */
3088         spin_lock(&oap->oap_lock);
3089         if (list_empty(&oap->oap_pending_item))
3090                 GOTO(out, rc = -EINVAL);
3091
3092         if ((oap->oap_async_flags & async_flags) == async_flags)
3093                 GOTO(out, rc = 0);
3094
3095         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3096                 oap->oap_async_flags |= ASYNC_READY;
3097
3098         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3099             list_empty(&oap->oap_rpc_item)) {
3100                 if (oap->oap_async_flags & ASYNC_HP)
3101                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3102                 else
3103                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3104                 oap->oap_async_flags |= ASYNC_URGENT;
3105                 loi_list_maint(cli, loi);
3106         }
3107
3108         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3109                         oap->oap_async_flags);
3110 out:
3111         spin_unlock(&oap->oap_lock);
3112         osc_check_rpcs(cli);
3113         client_obd_list_unlock(&cli->cl_loi_list_lock);
3114         RETURN(rc);
3115 }
3116
3117 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3118                              struct lov_oinfo *loi,
3119                              struct obd_io_group *oig, void *cookie,
3120                              int cmd, obd_off off, int count,
3121                              obd_flag brw_flags,
3122                              obd_flag async_flags)
3123 {
3124         struct client_obd *cli = &exp->exp_obd->u.cli;
3125         struct osc_async_page *oap;
3126         struct loi_oap_pages *lop;
3127         int rc = 0;
3128         ENTRY;
3129
3130         oap = oap_from_cookie(cookie);
3131         if (IS_ERR(oap))
3132                 RETURN(PTR_ERR(oap));
3133
3134         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3135                 RETURN(-EIO);
3136
3137         if (!list_empty(&oap->oap_pending_item) ||
3138             !list_empty(&oap->oap_urgent_item) ||
3139             !list_empty(&oap->oap_rpc_item))
3140                 RETURN(-EBUSY);
3141
3142         if (loi == NULL)
3143                 loi = lsm->lsm_oinfo[0];
3144
3145         client_obd_list_lock(&cli->cl_loi_list_lock);
3146
3147         oap->oap_cmd = cmd;
3148         oap->oap_page_off = off;
3149         oap->oap_count = count;
3150         oap->oap_brw_flags = brw_flags;
3151         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3152         if (libcfs_memory_pressure_get())
3153                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3154         spin_lock(&oap->oap_lock);
3155         oap->oap_async_flags = async_flags;
3156         spin_unlock(&oap->oap_lock);
3157
3158         if (cmd & OBD_BRW_WRITE)
3159                 lop = &loi->loi_write_lop;
3160         else
3161                 lop = &loi->loi_read_lop;
3162
3163         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3164         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3165                 oap->oap_oig = oig;
3166                 rc = oig_add_one(oig, &oap->oap_occ);
3167         }
3168
3169         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3170                   oap, oap->oap_page, rc);
3171
3172         client_obd_list_unlock(&cli->cl_loi_list_lock);
3173
3174         RETURN(rc);
3175 }
3176
3177 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3178                                  struct loi_oap_pages *lop, int cmd)
3179 {
3180         struct list_head *pos, *tmp;
3181         struct osc_async_page *oap;
3182
3183         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3184                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3185                 list_del(&oap->oap_pending_item);
3186                 osc_oap_to_pending(oap);
3187         }
3188         loi_list_maint(cli, loi);
3189 }
3190
3191 static int osc_trigger_group_io(struct obd_export *exp,
3192                                 struct lov_stripe_md *lsm,
3193                                 struct lov_oinfo *loi,
3194                                 struct obd_io_group *oig)
3195 {
3196         struct client_obd *cli = &exp->exp_obd->u.cli;
3197         ENTRY;
3198
3199         if (loi == NULL)
3200                 loi = lsm->lsm_oinfo[0];
3201
3202         client_obd_list_lock(&cli->cl_loi_list_lock);
3203
3204         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3205         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3206
3207         osc_check_rpcs(cli);
3208         client_obd_list_unlock(&cli->cl_loi_list_lock);
3209
3210         RETURN(0);
3211 }
3212
3213 static int osc_teardown_async_page(struct obd_export *exp,
3214                                    struct lov_stripe_md *lsm,
3215                                    struct lov_oinfo *loi, void *cookie)
3216 {
3217         struct client_obd *cli = &exp->exp_obd->u.cli;
3218         struct loi_oap_pages *lop;
3219         struct osc_async_page *oap;
3220         int rc = 0;
3221         ENTRY;
3222
3223         oap = oap_from_cookie(cookie);
3224         if (IS_ERR(oap))
3225                 RETURN(PTR_ERR(oap));
3226
3227         if (loi == NULL)
3228                 loi = lsm->lsm_oinfo[0];
3229
3230         if (oap->oap_cmd & OBD_BRW_WRITE) {
3231                 lop = &loi->loi_write_lop;
3232         } else {
3233                 lop = &loi->loi_read_lop;
3234         }
3235
3236         client_obd_list_lock(&cli->cl_loi_list_lock);
3237
3238         if (!list_empty(&oap->oap_rpc_item))
3239                 GOTO(out, rc = -EBUSY);
3240
3241         osc_exit_cache(cli, oap, 0);
3242         osc_wake_cache_waiters(cli);
3243
3244         if (!list_empty(&oap->oap_urgent_item)) {
3245                 list_del_init(&oap->oap_urgent_item);
3246                 spin_lock(&oap->oap_lock);
3247                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3248                 spin_unlock(&oap->oap_lock);
3249         }
3250
3251         if (!list_empty(&oap->oap_pending_item)) {
3252                 list_del_init(&oap->oap_pending_item);
3253                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3254         }
3255         loi_list_maint(cli, loi);
3256         cache_remove_extent(cli->cl_cache, oap);
3257
3258         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3259 out:
3260         client_obd_list_unlock(&cli->cl_loi_list_lock);
3261         RETURN(rc);
3262 }
3263
3264 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3265                            struct ldlm_lock_desc *new, void *data,
3266                            int flag)
3267 {
3268         struct lustre_handle lockh = { 0 };
3269         int rc;
3270         ENTRY;
3271
3272         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3273                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3274                 LBUG();
3275         }
3276
3277         switch (flag) {
3278         case LDLM_CB_BLOCKING:
3279                 ldlm_lock2handle(lock, &lockh);
3280                 rc = ldlm_cli_cancel(&lockh);
3281                 if (rc != ELDLM_OK)
3282                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
3283                 break;
3284         case LDLM_CB_CANCELING: {
3285
3286                 ldlm_lock2handle(lock, &lockh);
3287                 /* This lock wasn't granted, don't try to do anything */
3288                 if (lock->l_req_mode != lock->l_granted_mode)
3289                         RETURN(0);
3290
3291                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3292                                   &lockh);
3293
3294                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3295                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3296                                                           lock, new, data,flag);
3297                 break;
3298         }
3299         default:
3300                 LBUG();
3301         }
3302
3303         RETURN(0);
3304 }
3305 EXPORT_SYMBOL(osc_extent_blocking_cb);
3306
3307 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3308                                     int flags)
3309 {
3310         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3311
3312         if (lock == NULL) {
3313                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3314                 return;
3315         }
3316         lock_res_and_lock(lock);
3317 #if defined (__KERNEL__) && defined (__linux__)
3318         /* Liang XXX: Darwin and Winnt checking should be added */
3319         if (lock->l_ast_data && lock->l_ast_data != data) {
3320                 struct inode *new_inode = data;
3321                 struct inode *old_inode = lock->l_ast_data;
3322                 if (!(old_inode->i_state & I_FREEING))
3323                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3324                 LASSERTF(old_inode->i_state & I_FREEING,
3325                          "Found existing inode %p/%lu/%u state %lu in lock: "
3326                          "setting data to %p/%lu/%u\n", old_inode,
3327                          old_inode->i_ino, old_inode->i_generation,
3328                          old_inode->i_state,
3329                          new_inode, new_inode->i_ino, new_inode->i_generation);
3330         }
3331 #endif
3332         lock->l_ast_data = data;
3333         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3334         unlock_res_and_lock(lock);
3335         LDLM_LOCK_PUT(lock);
3336 }
3337
3338 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3339                              ldlm_iterator_t replace, void *data)
3340 {
3341         struct ldlm_res_id res_id;
3342         struct obd_device *obd = class_exp2obd(exp);
3343
3344         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3345         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3346         return 0;
3347 }
3348
3349 /* find any ldlm lock of the inode in osc
3350  * return 0    not find
3351  *        1    find one
3352  *      < 0    error */
3353 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3354                            ldlm_iterator_t replace, void *data)
3355 {
3356         struct ldlm_res_id res_id;
3357         struct obd_device *obd = class_exp2obd(exp);
3358         int rc = 0;
3359
3360         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3361         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3362         if (rc == LDLM_ITER_STOP)
3363                 return(1);
3364         if (rc == LDLM_ITER_CONTINUE)
3365                 return(0);
3366         return(rc);
3367 }
3368
3369 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3370                             struct obd_info *oinfo, int intent, int rc)
3371 {
3372         ENTRY;
3373
3374         if (intent) {
3375                 /* The request was created before ldlm_cli_enqueue call. */
3376                 if (rc == ELDLM_LOCK_ABORTED) {
3377                         struct ldlm_reply *rep;
3378
3379                         /* swabbed by ldlm_cli_enqueue() */
3380                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3381                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3382                                              sizeof(*rep));
3383                         LASSERT(rep != NULL);
3384                         if (rep->lock_policy_res1)
3385                                 rc = rep->lock_policy_res1;
3386                 }
3387         }
3388
3389         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3390                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3391                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3392                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3393                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3394         }
3395
3396         if (!rc)
3397                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3398
3399         /* Call the update callback. */
3400         rc = oinfo->oi_cb_up(oinfo, rc);
3401         RETURN(rc);
3402 }
3403
3404 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3405                                  void *data, int rc)
3406 {
3407         struct osc_enqueue_args *aa = data;
3408         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3409         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3410         struct ldlm_lock *lock;
3411
3412         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3413          * be valid. */
3414         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3415
3416         /* Complete obtaining the lock procedure. */
3417         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3418                                    aa->oa_ei->ei_mode,
3419                                    &aa->oa_oi->oi_flags,
3420                                    &lsm->lsm_oinfo[0]->loi_lvb,
3421                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3422                                    lustre_swab_ost_lvb,
3423                                    aa->oa_oi->oi_lockh, rc);
3424
3425         /* Complete osc stuff. */
3426         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3427
3428         /* Release the lock for async request. */
3429         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3430                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3431
3432         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3433                  aa->oa_oi->oi_lockh, req, aa);
3434         LDLM_LOCK_PUT(lock);
3435         return rc;
3436 }
3437
3438 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3439  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3440  * other synchronous requests, however keeping some locks and trying to obtain
3441  * others may take a considerable amount of time in a case of ost failure; and
3442  * when other sync requests do not get released lock from a client, the client
3443  * is excluded from the cluster -- such scenarious make the life difficult, so
3444  * release locks just after they are obtained. */
3445 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3446                        struct ldlm_enqueue_info *einfo,
3447                        struct ptlrpc_request_set *rqset)
3448 {
3449         struct ldlm_res_id res_id;
3450         struct obd_device *obd = exp->exp_obd;
3451         struct ldlm_reply *rep;
3452         struct ptlrpc_request *req = NULL;
3453         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3454         ldlm_mode_t mode;
3455         int rc;
3456         ENTRY;
3457
3458         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3459                            oinfo->oi_md->lsm_object_gr, &res_id);
3460         /* Filesystem lock extents are extended to page boundaries so that
3461          * dealing with the page cache is a little smoother.  */
3462         oinfo->oi_policy.l_extent.start -=
3463                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3464         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3465
3466         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3467                 goto no_match;
3468
3469         /* Next, search for already existing extent locks that will cover us */
3470         /* If we're trying to read, we also search for an existing PW lock.  The
3471          * VFS and page cache already protect us locally, so lots of readers/
3472          * writers can share a single PW lock.
3473          *
3474          * There are problems with conversion deadlocks, so instead of
3475          * converting a read lock to a write lock, we'll just enqueue a new
3476          * one.
3477          *
3478          * At some point we should cancel the read lock instead of making them
3479          * send us a blocking callback, but there are problems with canceling
3480          * locks out from other users right now, too. */
3481         mode = einfo->ei_mode;
3482         if (einfo->ei_mode == LCK_PR)
3483                 mode |= LCK_PW;
3484         mode = ldlm_lock_match(obd->obd_namespace,
3485                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3486                                einfo->ei_type, &oinfo->oi_policy, mode,
3487                                oinfo->oi_lockh);
3488         if (mode) {
3489                 /* addref the lock only if not async requests and PW lock is
3490                  * matched whereas we asked for PR. */
3491                 if (!rqset && einfo->ei_mode != mode)
3492                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3493                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3494                                         oinfo->oi_flags);
3495                 if (intent) {
3496                         /* I would like to be able to ASSERT here that rss <=
3497                          * kms, but I can't, for reasons which are explained in
3498                          * lov_enqueue() */
3499                 }
3500
3501                 /* We already have a lock, and it's referenced */
3502                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3503
3504                 /* For async requests, decref the lock. */
3505                 if (einfo->ei_mode != mode)
3506                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3507                 else if (rqset)
3508                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3509
3510                 RETURN(ELDLM_OK);
3511         }
3512
3513  no_match:
3514         if (intent) {
3515                 __u32 size[3] = {
3516                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3517                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3518                         [DLM_LOCKREQ_OFF + 1] = 0 };
3519
3520                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3521                 if (req == NULL)
3522                         RETURN(-ENOMEM);
3523
3524                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3525                 size[DLM_REPLY_REC_OFF] =
3526                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3527                 ptlrpc_req_set_repsize(req, 3, size);
3528         }
3529
3530         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3531         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3532
3533         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3534                               &oinfo->oi_policy, &oinfo->oi_flags,
3535                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3536                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3537                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3538                               rqset ? 1 : 0);
3539         if (rqset) {
3540                 if (!rc) {
3541                         struct osc_enqueue_args *aa;
3542                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3543                         aa = ptlrpc_req_async_args(req);
3544                         aa->oa_oi = oinfo;
3545                         aa->oa_ei = einfo;
3546                         aa->oa_exp = exp;
3547
3548                         req->rq_interpret_reply = osc_enqueue_interpret;
3549                         ptlrpc_set_add_req(rqset, req);
3550                 } else if (intent) {
3551                         ptlrpc_req_finished(req);
3552                 }
3553                 RETURN(rc);
3554         }
3555
3556         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3557         if (intent)
3558                 ptlrpc_req_finished(req);
3559
3560         RETURN(rc);
3561 }
3562
3563 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3564                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3565                      int *flags, void *data, struct lustre_handle *lockh,
3566                      int *n_matches)
3567 {
3568         struct ldlm_res_id res_id;
3569         struct obd_device *obd = exp->exp_obd;
3570         int lflags = *flags;
3571         ldlm_mode_t rc;
3572         ENTRY;
3573
3574         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3575
3576         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3577
3578         /* Filesystem lock extents are extended to page boundaries so that
3579          * dealing with the page cache is a little smoother */
3580         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3581         policy->l_extent.end |= ~CFS_PAGE_MASK;
3582
3583         /* Next, search for already existing extent locks that will cover us */
3584         /* If we're trying to read, we also search for an existing PW lock.  The
3585          * VFS and page cache already protect us locally, so lots of readers/
3586          * writers can share a single PW lock. */
3587         rc = mode;
3588         if (mode == LCK_PR)
3589                 rc |= LCK_PW;
3590         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3591                              &res_id, type, policy, rc, lockh);
3592         if (rc) {
3593                 osc_set_data_with_check(lockh, data, lflags);
3594                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3595                         ldlm_lock_addref(lockh, LCK_PR);
3596                         ldlm_lock_decref(lockh, LCK_PW);
3597                 }
3598                 if (n_matches != NULL)
3599                         (*n_matches)++;
3600         }
3601
3602         RETURN(rc);
3603 }
3604
3605 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3606                       __u32 mode, struct lustre_handle *lockh, int flags,
3607                       obd_off end)
3608 {
3609         ENTRY;
3610
3611         if (unlikely(mode == LCK_GROUP))
3612                 ldlm_lock_decref_and_cancel(lockh, mode);
3613         else
3614                 ldlm_lock_decref(lockh, mode);
3615
3616         RETURN(0);
3617 }
3618
3619 static int osc_cancel_unused(struct obd_export *exp,
3620                              struct lov_stripe_md *lsm, int flags, void *opaque)
3621 {
3622         struct obd_device *obd = class_exp2obd(exp);
3623         struct ldlm_res_id res_id, *resp = NULL;
3624
3625         if (lsm != NULL) {
3626                 resp = osc_build_res_name(lsm->lsm_object_id,
3627                                           lsm->lsm_object_gr, &res_id);
3628         }
3629
3630         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3631
3632 }
3633
3634 static int osc_join_lru(struct obd_export *exp,
3635                         struct lov_stripe_md *lsm, int join)
3636 {
3637         struct obd_device *obd = class_exp2obd(exp);
3638         struct ldlm_res_id res_id, *resp = NULL;
3639
3640         if (lsm != NULL) {
3641                 resp = osc_build_res_name(lsm->lsm_object_id,
3642                                           lsm->lsm_object_gr, &res_id);
3643         }
3644
3645         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3646
3647 }
3648
3649 static int osc_statfs_interpret(struct ptlrpc_request *req,
3650                                 void *data, int rc)
3651 {
3652         struct osc_async_args *aa = data;
3653         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3654         struct obd_statfs *msfs;
3655         __u64 used;
3656         ENTRY;
3657
3658         if (rc == -EBADR)
3659                 /* The request has in fact never been sent
3660                  * due to issues at a higher level (LOV).
3661                  * Exit immediately since the caller is
3662                  * aware of the problem and takes care
3663                  * of the clean up */
3664                  RETURN(rc);
3665
3666         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3667             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3668                 GOTO(out, rc = 0);
3669
3670         if (rc != 0)
3671                 GOTO(out, rc);
3672
3673         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3674                                   lustre_swab_obd_statfs);
3675         if (msfs == NULL) {
3676                 CERROR("Can't unpack obd_statfs\n");
3677                 GOTO(out, rc = -EPROTO);
3678         }
3679
3680         /* Reinitialize the RDONLY and DEGRADED flags at the client
3681          * on each statfs, so they don't stay set permanently. */
3682         spin_lock(&cli->cl_oscc.oscc_lock);
3683
3684         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3685                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3686         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3687                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3688
3689         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3690                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3691         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3692                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3693
3694         /* Add a bit of hysteresis so this flag isn't continually flapping,
3695          * and ensure that new files don't get extremely fragmented due to
3696          * only a small amount of available space in the filesystem.
3697          * We want to set the NOSPC flag when there is less than ~0.1% free
3698          * and clear it when there is at least ~0.2% free space, so:
3699          *                   avail < ~0.1% max          max = avail + used
3700          *            1025 * avail < avail + used       used = blocks - free
3701          *            1024 * avail < used
3702          *            1024 * avail < blocks - free
3703          *                   avail < ((blocks - free) >> 10)
3704          *
3705          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3706          * lose that amount of space so in those cases we report no space left
3707          * if their is less than 1 GB left.                             */
3708         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3709         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3710                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3711                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3712         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3713                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3714                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3715
3716         spin_unlock(&cli->cl_oscc.oscc_lock);
3717
3718         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3719 out:
3720         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3721         RETURN(rc);
3722 }
3723
3724 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3725                             __u64 max_age, struct ptlrpc_request_set *rqset)
3726 {
3727         struct ptlrpc_request *req;
3728         struct osc_async_args *aa;
3729         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3730         ENTRY;
3731
3732         /* We could possibly pass max_age in the request (as an absolute
3733          * timestamp or a "seconds.usec ago") so the target can avoid doing
3734          * extra calls into the filesystem if that isn't necessary (e.g.
3735          * during mount that would help a bit).  Having relative timestamps
3736          * is not so great if request processing is slow, while absolute
3737          * timestamps are not ideal because they need time synchronization. */
3738         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3739                               OST_STATFS, 1, NULL, NULL);
3740         if (!req)
3741                 RETURN(-ENOMEM);
3742
3743         ptlrpc_req_set_repsize(req, 2, size);
3744         req->rq_request_portal = OST_CREATE_PORTAL;
3745         ptlrpc_at_set_req_timeout(req);
3746         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3747                 /* procfs requests not want stat in wait for avoid deadlock */
3748                 req->rq_no_resend = 1;
3749                 req->rq_no_delay = 1;
3750         }
3751
3752         req->rq_interpret_reply = osc_statfs_interpret;
3753         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3754         aa = ptlrpc_req_async_args(req);
3755         aa->aa_oi = oinfo;
3756
3757         ptlrpc_set_add_req(rqset, req);
3758         RETURN(0);
3759 }
3760
3761 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3762                       __u64 max_age, __u32 flags)
3763 {
3764         struct obd_statfs *msfs;
3765         struct ptlrpc_request *req;
3766         struct obd_import     *imp = NULL;
3767         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3768         int rc;
3769         ENTRY;
3770
3771         /*Since the request might also come from lprocfs, so we need
3772          *sync this with client_disconnect_export Bug15684*/
3773         down_read(&obd->u.cli.cl_sem);
3774         if (obd->u.cli.cl_import)
3775                 imp = class_import_get(obd->u.cli.cl_import);
3776         up_read(&obd->u.cli.cl_sem);
3777         if (!imp)
3778                 RETURN(-ENODEV);
3779
3780         /* We could possibly pass max_age in the request (as an absolute
3781          * timestamp or a "seconds.usec ago") so the target can avoid doing
3782          * extra calls into the filesystem if that isn't necessary (e.g.
3783          * during mount that would help a bit).  Having relative timestamps
3784          * is not so great if request processing is slow, while absolute
3785          * timestamps are not ideal because they need time synchronization. */
3786         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3787                               OST_STATFS, 1, NULL, NULL);
3788
3789         class_import_put(imp);
3790         if (!req)
3791                 RETURN(-ENOMEM);
3792
3793         ptlrpc_req_set_repsize(req, 2, size);
3794         req->rq_request_portal = OST_CREATE_PORTAL;
3795         ptlrpc_at_set_req_timeout(req);
3796
3797         if (flags & OBD_STATFS_NODELAY) {
3798                 /* procfs requests not want stat in wait for avoid deadlock */
3799                 req->rq_no_resend = 1;
3800                 req->rq_no_delay = 1;
3801         }
3802
3803         rc = ptlrpc_queue_wait(req);
3804         if (rc)
3805                 GOTO(out, rc);
3806
3807         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3808                                   lustre_swab_obd_statfs);
3809         if (msfs == NULL) {
3810                 CERROR("Can't unpack obd_statfs\n");
3811                 GOTO(out, rc = -EPROTO);
3812         }
3813
3814         memcpy(osfs, msfs, sizeof(*osfs));
3815
3816         EXIT;
3817  out:
3818         ptlrpc_req_finished(req);
3819         return rc;
3820 }
3821
3822 /* Retrieve object striping information.
3823  *
3824  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3825  * the maximum number of OST indices which will fit in the user buffer.
3826  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3827  */
3828 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3829 {
3830         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3831         struct lov_user_md_v3 lum, *lumk;
3832         int rc = 0, lum_size;
3833         struct lov_user_ost_data_v1 *lmm_objects;
3834         ENTRY;
3835
3836         if (!lsm)
3837                 RETURN(-ENODATA);
3838
3839         /* we only need the header part from user space to get lmm_magic and
3840          * lmm_stripe_count, (the header part is common to v1 and v3) */
3841         lum_size = sizeof(struct lov_user_md_v1);
3842         memset(&lum, 0x00, sizeof(lum));
3843         if (copy_from_user(&lum, lump, lum_size))
3844                 RETURN(-EFAULT);
3845
3846         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3847             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3848                 RETURN(-EINVAL);
3849
3850         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3851         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3852         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3853         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3854
3855         /* we can use lov_mds_md_size() to compute lum_size
3856          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3857         if (lum.lmm_stripe_count > 0) {
3858                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3859                 OBD_ALLOC(lumk, lum_size);
3860                 if (!lumk)
3861                         RETURN(-ENOMEM);
3862                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3863                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3864                 else
3865                         lmm_objects = &(lumk->lmm_objects[0]);
3866                 lmm_objects->l_object_id = lsm->lsm_object_id;
3867         } else {
3868                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3869                 lumk = &lum;
3870         }
3871
3872         lumk->lmm_magic = lum.lmm_magic;
3873         lumk->lmm_stripe_count = 1;
3874         lumk->lmm_object_id = lsm->lsm_object_id;
3875
3876         if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3877             (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3878                /* lsm not in host order, so count also need be in same order */
3879                 __swab32s(&lumk->lmm_magic);
3880                 __swab16s(&lumk->lmm_stripe_count);
3881                 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3882                 if (lum.lmm_stripe_count > 0)
3883                         lustre_swab_lov_user_md_objects(
3884                                 (struct lov_user_md_v1*)lumk);
3885         }
3886
3887         if (copy_to_user(lump, lumk, lum_size))
3888                 rc = -EFAULT;
3889
3890         if (lumk != &lum)
3891                 OBD_FREE(lumk, lum_size);
3892
3893         RETURN(rc);
3894 }
3895
3896
3897 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3898                          void *karg, void *uarg)
3899 {
3900         struct obd_device *obd = exp->exp_obd;
3901         struct obd_ioctl_data *data = karg;
3902         int err = 0;
3903         ENTRY;
3904
3905         if (!try_module_get(THIS_MODULE)) {
3906                 CERROR("Can't get module. Is it alive?");
3907                 return -EINVAL;
3908         }
3909         switch (cmd) {
3910         case OBD_IOC_LOV_GET_CONFIG: {
3911                 char *buf;
3912                 struct lov_desc *desc;
3913                 struct obd_uuid uuid;
3914
3915                 buf = NULL;
3916                 len = 0;
3917                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3918                         GOTO(out, err = -EINVAL);
3919
3920                 data = (struct obd_ioctl_data *)buf;
3921
3922                 if (sizeof(*desc) > data->ioc_inllen1) {
3923                         obd_ioctl_freedata(buf, len);
3924                         GOTO(out, err = -EINVAL);
3925                 }
3926
3927                 if (data->ioc_inllen2 < sizeof(uuid)) {
3928                         obd_ioctl_freedata(buf, len);
3929                         GOTO(out, err = -EINVAL);
3930                 }
3931
3932                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3933                 desc->ld_tgt_count = 1;
3934                 desc->ld_active_tgt_count = 1;
3935                 desc->ld_default_stripe_count = 1;
3936                 desc->ld_default_stripe_size = 0;
3937                 desc->ld_default_stripe_offset = 0;
3938                 desc->ld_pattern = 0;
3939                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3940
3941                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3942
3943                 err = copy_to_user((void *)uarg, buf, len);
3944                 if (err)
3945                         err = -EFAULT;
3946                 obd_ioctl_freedata(buf, len);
3947                 GOTO(out, err);
3948         }
3949         case LL_IOC_LOV_SETSTRIPE:
3950                 err = obd_alloc_memmd(exp, karg);
3951                 if (err > 0)
3952                         err = 0;
3953                 GOTO(out, err);
3954         case LL_IOC_LOV_GETSTRIPE:
3955                 err = osc_getstripe(karg, uarg);
3956                 GOTO(out, err);
3957         case OBD_IOC_CLIENT_RECOVER:
3958                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3959                                             data->ioc_inlbuf1);
3960                 if (err > 0)
3961                         err = 0;
3962                 GOTO(out, err);
3963         case IOC_OSC_SET_ACTIVE:
3964                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3965                                                data->ioc_offset);
3966                 GOTO(out, err);
3967         case OBD_IOC_POLL_QUOTACHECK:
3968                 err = lquota_poll_check(quota_interface, exp,
3969                                         (struct if_quotacheck *)karg);
3970                 GOTO(out, err);
3971         case OBD_IOC_DESTROY: {
3972                 struct obdo            *oa;
3973
3974                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3975                         GOTO (out, err = -EPERM);
3976                 oa = &data->ioc_obdo1;
3977
3978                 if (oa->o_id == 0)
3979                         GOTO(out, err = -EINVAL);
3980
3981                 oa->o_valid |= OBD_MD_FLGROUP;
3982
3983                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3984                 GOTO(out, err);
3985         }
3986         case OBD_IOC_PING_TARGET:
3987                 err = ptlrpc_obd_ping(obd);
3988                 GOTO(out, err);
3989         default:
3990                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3991                        cmd, cfs_curproc_comm());
3992                 GOTO(out, err = -ENOTTY);
3993         }
3994 out:
3995         module_put(THIS_MODULE);
3996         return err;
3997 }
3998
3999 static int osc_get_info(struct obd_export *exp, obd_count keylen,
4000                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
4001 {
4002         ENTRY;
4003         if (!vallen || !val)
4004                 RETURN(-EFAULT);
4005
4006         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
4007                 __u32 *stripe = val;
4008                 *vallen = sizeof(*stripe);
4009                 *stripe = 0;
4010                 RETURN(0);
4011         } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
4012                 struct client_obd *cli = &exp->exp_obd->u.cli;
4013                 __u64 *rpcsize = val;
4014                 LASSERT(*vallen == sizeof(__u64));
4015                 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
4016                 RETURN(0);
4017         } else if (KEY_IS(KEY_LAST_ID)) {
4018                 struct ptlrpc_request *req;
4019                 obd_id *reply;
4020                 char *bufs[2] = { NULL, key };
4021                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
4022                 int rc;
4023
4024                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
4025                                       OST_GET_INFO, 2, size, bufs);
4026                 if (req == NULL)
4027                         RETURN(-ENOMEM);
4028
4029                 size[REPLY_REC_OFF] = *vallen;
4030                 ptlrpc_req_set_repsize(req, 2, size);
4031                 rc = ptlrpc_queue_wait(req);
4032                 if (rc)
4033                         GOTO(out, rc);
4034
4035                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
4036                                            lustre_swab_ost_last_id);
4037                 if (reply == NULL) {
4038                         CERROR("Can't unpack OST last ID\n");
4039                         GOTO(out, rc = -EPROTO);
4040                 }
4041                 *((obd_id *)val) = *reply;
4042         out:
4043                 ptlrpc_req_finished(req);
4044                 RETURN(rc);
4045         } else if (KEY_IS(KEY_FIEMAP)) {
4046                 struct ptlrpc_request *req;
4047                 struct ll_user_fiemap *reply;
4048                 char *bufs[2] = { NULL, key };
4049                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
4050                 int rc;
4051
4052                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
4053                                       OST_GET_INFO, 2, size, bufs);
4054                 if (req == NULL)
4055                         RETURN(-ENOMEM);
4056
4057                 size[REPLY_REC_OFF] = *vallen;
4058                 ptlrpc_req_set_repsize(req, 2, size);
4059
4060                 rc = ptlrpc_queue_wait(req);
4061                 if (rc)
4062                         GOTO(out1, rc);
4063                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
4064                                            lustre_swab_fiemap);
4065                 if (reply == NULL) {
4066                         CERROR("Can't unpack FIEMAP reply.\n");
4067                         GOTO(out1, rc = -EPROTO);
4068                 }
4069
4070                 memcpy(val, reply, *vallen);
4071
4072         out1:
4073                 ptlrpc_req_finished(req);
4074
4075                 RETURN(rc);
4076         }
4077
4078         RETURN(-EINVAL);
4079 }
4080
4081 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
4082                                           void *aa, int rc)
4083 {
4084         struct llog_ctxt *ctxt;
4085         struct obd_import *imp = req->rq_import;
4086         ENTRY;
4087
4088         if (rc != 0)
4089                 RETURN(rc);
4090
4091         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4092         if (ctxt) {
4093                 if (rc == 0)
4094                         rc = llog_initiator_connect(ctxt);
4095                 else
4096                         CERROR("cannot establish connection for "
4097                                "ctxt %p: %d\n", ctxt, rc);
4098         }
4099
4100         llog_ctxt_put(ctxt);
4101         spin_lock(&imp->imp_lock);
4102         imp->imp_server_timeout = 1;
4103         imp->imp_pingable = 1;
4104         spin_unlock(&imp->imp_lock);
4105         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4106
4107         RETURN(rc);
4108 }
4109
4110 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4111                               void *key, obd_count vallen, void *val,
4112                               struct ptlrpc_request_set *set)
4113 {
4114         struct ptlrpc_request *req;
4115         struct obd_device  *obd = exp->exp_obd;
4116         struct obd_import *imp = class_exp2cliimp(exp);
4117         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4118         char *bufs[3] = { NULL, key, val };
4119         ENTRY;
4120
4121         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4122
4123         if (KEY_IS(KEY_NEXT_ID)) {
4124                 obd_id new_val;
4125                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4126
4127                 if (vallen != sizeof(obd_id))
4128                         RETURN(-EINVAL);
4129
4130                 /* avoid race between allocate new object and set next id
4131                  * from ll_sync thread */
4132                 spin_lock(&oscc->oscc_lock);
4133                 new_val = *((obd_id*)val) + 1;
4134                 if (new_val > oscc->oscc_next_id)
4135                         oscc->oscc_next_id = new_val;
4136                 spin_unlock(&oscc->oscc_lock);
4137
4138                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4139                        exp->exp_obd->obd_name,
4140                        oscc->oscc_next_id);
4141
4142                 RETURN(0);
4143         }
4144
4145         if (KEY_IS(KEY_INIT_RECOV)) {
4146                 if (vallen != sizeof(int))
4147                         RETURN(-EINVAL);
4148                 spin_lock(&imp->imp_lock);
4149                 imp->imp_initial_recov = *(int *)val;
4150                 spin_unlock(&imp->imp_lock);
4151                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4152                        exp->exp_obd->obd_name,
4153                        imp->imp_initial_recov);
4154                 RETURN(0);
4155         }
4156
4157         if (KEY_IS(KEY_CHECKSUM)) {
4158                 if (vallen != sizeof(int))
4159                         RETURN(-EINVAL);
4160                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4161                 RETURN(0);
4162         }
4163
4164         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4165                 RETURN(-EINVAL);
4166
4167         /* We pass all other commands directly to OST. Since nobody calls osc
4168            methods directly and everybody is supposed to go through LOV, we
4169            assume lov checked invalid values for us.
4170            The only recognised values so far are evict_by_nid and mds_conn.
4171            Even if something bad goes through, we'd get a -EINVAL from OST
4172            anyway. */
4173
4174         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4175                               bufs);
4176         if (req == NULL)
4177                 RETURN(-ENOMEM);
4178
4179         if (KEY_IS(KEY_MDS_CONN))
4180                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4181         else if (KEY_IS(KEY_GRANT_SHRINK))
4182                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4183
4184         if (KEY_IS(KEY_GRANT_SHRINK)) {
4185                 struct osc_grant_args *aa;
4186                 struct obdo *oa;
4187
4188                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4189                 aa = ptlrpc_req_async_args(req);
4190                 OBDO_ALLOC(oa);
4191                 if (!oa) {
4192                         ptlrpc_req_finished(req);
4193                         RETURN(-ENOMEM);
4194                 }
4195                 *oa = ((struct ost_body *)val)->oa;
4196                 aa->aa_oa = oa;
4197
4198                 size[1] = vallen;
4199                 ptlrpc_req_set_repsize(req, 2, size);
4200                 ptlrpcd_add_req(req);
4201         } else {
4202                 ptlrpc_req_set_repsize(req, 1, NULL);
4203                 ptlrpc_set_add_req(set, req);
4204                 ptlrpc_check_set(set);
4205         }
4206
4207         RETURN(0);
4208 }
4209
4210
4211 static struct llog_operations osc_size_repl_logops = {
4212         lop_cancel: llog_obd_repl_cancel
4213 };
4214
4215 static struct llog_operations osc_mds_ost_orig_logops;
4216 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4217                          int *index)
4218 {
4219         struct llog_catid catid;
4220         static char name[32] = CATLIST;
4221         int rc;
4222         ENTRY;
4223
4224         LASSERT(index);
4225
4226         mutex_down(&disk_obd->obd_llog_cat_process);
4227
4228         rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4229         if (rc) {
4230                 CERROR("rc: %d\n", rc);
4231                 GOTO(out_unlock, rc);
4232         }
4233 #if 0
4234         CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4235                obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4236                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4237 #endif
4238
4239         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4240                         &catid.lci_logid, &osc_mds_ost_orig_logops);
4241         if (rc) {
4242                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4243                 GOTO (out, rc);
4244         }
4245
4246         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4247                         &osc_size_repl_logops);
4248         if (rc) {
4249                 struct llog_ctxt *ctxt =
4250                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4251                 if (ctxt)
4252                         llog_cleanup(ctxt);
4253                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4254         }
4255 out:
4256         if (rc) {
4257                 CERROR("osc '%s' tgt '%s' rc=%d\n",
4258                        obd->obd_name, disk_obd->obd_name, rc);
4259                 CERROR("logid "LPX64":0x%x\n",
4260                        catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4261         } else {
4262                 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4263                                        &catid);
4264                 if (rc)
4265                         CERROR("rc: %d\n", rc);
4266         }
4267 out_unlock:
4268         mutex_up(&disk_obd->obd_llog_cat_process);
4269
4270         RETURN(rc);
4271 }
4272
4273 static int osc_llog_finish(struct obd_device *obd, int count)
4274 {
4275         struct llog_ctxt *ctxt;
4276         int rc = 0, rc2 = 0;
4277         ENTRY;
4278
4279         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4280         if (ctxt)
4281                 rc = llog_cleanup(ctxt);
4282
4283         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4284         if (ctxt)
4285                 rc2 = llog_cleanup(ctxt);
4286         if (!rc)
4287                 rc = rc2;
4288
4289         RETURN(rc);
4290 }
4291
4292 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4293                          struct obd_uuid *cluuid,
4294                          struct obd_connect_data *data,
4295                          void *localdata)
4296 {
4297         struct client_obd *cli = &obd->u.cli;
4298
4299         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4300                 long lost_grant;
4301
4302                 client_obd_list_lock(&cli->cl_loi_list_lock);
4303                 data->ocd_grant = cli->cl_avail_grant + cli->cl_dirty ?:
4304                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4305                 lost_grant = cli->cl_lost_grant;
4306                 cli->cl_lost_grant = 0;
4307                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4308
4309                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4310                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4311                        cli->cl_dirty, cli->cl_avail_grant, lost_grant);
4312                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4313                        " ocd_grant: %d\n", data->ocd_connect_flags,
4314                        data->ocd_version, data->ocd_grant);
4315         }
4316
4317         RETURN(0);
4318 }
4319
4320 static int osc_disconnect(struct obd_export *exp)
4321 {
4322         struct obd_device *obd = class_exp2obd(exp);
4323         struct llog_ctxt  *ctxt;
4324         int rc;
4325
4326         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4327         if (ctxt) {
4328                 if (obd->u.cli.cl_conn_count == 1) {
4329                         /* Flush any remaining cancel messages out to the
4330                          * target */
4331                         llog_sync(ctxt, exp);
4332                 }
4333                 llog_ctxt_put(ctxt);
4334         } else {
4335                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4336                        obd);
4337         }
4338
4339         rc = client_disconnect_export(exp);
4340         /**
4341          * Initially we put del_shrink_grant before disconnect_export, but it
4342          * causes the following problem if setup (connect) and cleanup
4343          * (disconnect) are tangled together.
4344          *      connect p1                     disconnect p2
4345          *   ptlrpc_connect_import
4346          *     ...............               class_manual_cleanup
4347          *                                     osc_disconnect
4348          *                                     del_shrink_grant
4349          *   ptlrpc_connect_interrupt
4350          *     init_grant_shrink
4351          *   add this client to shrink list
4352          *                                      cleanup_osc
4353          * Bang! pinger trigger the shrink.
4354          * So the osc should be disconnected from the shrink list, after we
4355          * are sure the import has been destroyed. BUG18662
4356          */
4357         if (obd->u.cli.cl_import == NULL)
4358                 osc_del_shrink_grant(&obd->u.cli);
4359         return rc;
4360 }
4361
4362 static int osc_import_event(struct obd_device *obd,
4363                             struct obd_import *imp,
4364                             enum obd_import_event event)
4365 {
4366         struct client_obd *cli;
4367         int rc = 0;
4368
4369         ENTRY;
4370         LASSERT(imp->imp_obd == obd);
4371
4372         switch (event) {
4373         case IMP_EVENT_DISCON: {
4374                 /* Only do this on the MDS OSC's */
4375                 if (imp->imp_server_timeout) {
4376                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4377
4378                         spin_lock(&oscc->oscc_lock);
4379                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4380                         spin_unlock(&oscc->oscc_lock);
4381                 }
4382                 cli = &obd->u.cli;
4383                 client_obd_list_lock(&cli->cl_loi_list_lock);
4384                 cli->cl_avail_grant = 0;
4385                 cli->cl_lost_grant = 0;
4386                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4387                 ptlrpc_import_setasync(imp, -1);
4388
4389                 break;
4390         }
4391         case IMP_EVENT_INACTIVE: {
4392                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4393                 break;
4394         }
4395         case IMP_EVENT_INVALIDATE: {
4396                 struct ldlm_namespace *ns = obd->obd_namespace;
4397
4398                 /* Reset grants */
4399                 cli = &obd->u.cli;
4400                 client_obd_list_lock(&cli->cl_loi_list_lock);
4401                 /* all pages go to failing rpcs due to the invalid import */
4402                 osc_check_rpcs(cli);
4403                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4404
4405                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4406
4407                 break;
4408         }
4409         case IMP_EVENT_ACTIVE: {
4410                 /* Only do this on the MDS OSC's */
4411                 if (imp->imp_server_timeout) {
4412                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4413
4414                         spin_lock(&oscc->oscc_lock);
4415                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4416                         spin_unlock(&oscc->oscc_lock);
4417                 }
4418                 CDEBUG(D_INFO, "notify server \n");
4419                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4420                 break;
4421         }
4422         case IMP_EVENT_OCD: {
4423                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4424
4425                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4426                         osc_init_grant(&obd->u.cli, ocd);
4427
4428                 /* See bug 7198 */
4429                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4430                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4431
4432                 ptlrpc_import_setasync(imp, 1);
4433                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4434                 break;
4435         }
4436         default:
4437                 CERROR("Unknown import event %d\n", event);
4438                 LBUG();
4439         }
4440         RETURN(rc);
4441 }
4442
4443 /* determine whether the lock can be canceled before replaying the lock
4444  * during recovery, see bug16774 for detailed information 
4445  *
4446  * return values:
4447  *  zero  - the lock can't be canceled
4448  *  other - ok to cancel
4449  */
4450 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4451 {
4452         check_res_locked(lock->l_resource);
4453         if (lock->l_granted_mode == LCK_GROUP || 
4454             lock->l_resource->lr_type != LDLM_EXTENT)
4455                 RETURN(0);
4456
4457         /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4458         if (lock->l_granted_mode == LCK_PR ||
4459             lock->l_granted_mode == LCK_CR)
4460                 RETURN(1);
4461
4462         RETURN(0);       
4463 }
4464
4465 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4466 {
4467         int rc;
4468         ENTRY;
4469
4470         ENTRY;
4471         rc = ptlrpcd_addref();
4472         if (rc)
4473                 RETURN(rc);
4474
4475         rc = client_obd_setup(obd, len, buf);
4476         if (rc) {
4477                 ptlrpcd_decref();
4478         } else {
4479                 struct lprocfs_static_vars lvars = { 0 };
4480                 struct client_obd *cli = &obd->u.cli;
4481
4482                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4483                 lprocfs_osc_init_vars(&lvars);
4484                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4485                         lproc_osc_attach_seqstat(obd);
4486                         ptlrpc_lprocfs_register_obd(obd);
4487                 }
4488
4489                 oscc_init(obd);
4490                 /* We need to allocate a few requests more, because
4491                    brw_interpret tries to create new requests before freeing
4492                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4493                    reserved, but I afraid that might be too much wasted RAM
4494                    in fact, so 2 is just my guess and still should work. */
4495                 cli->cl_import->imp_rq_pool =
4496                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4497                                             OST_MAXREQSIZE,
4498                                             ptlrpc_add_rqs_to_pool);
4499                 cli->cl_cache = cache_create(obd);
4500                 if (!cli->cl_cache) {
4501                         osc_cleanup(obd);
4502                         rc = -ENOMEM;
4503                 }
4504                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4505                 sema_init(&cli->cl_grant_sem, 1);
4506
4507                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4508         }
4509
4510         RETURN(rc);
4511 }
4512
4513 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4514 {
4515         int rc = 0;
4516         ENTRY;
4517
4518         switch (stage) {
4519         case OBD_CLEANUP_EARLY: {
4520                 struct obd_import *imp;
4521                 imp = obd->u.cli.cl_import;
4522                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4523                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4524                 ptlrpc_deactivate_import(imp);
4525                 break;
4526         }
4527         case OBD_CLEANUP_EXPORTS: {
4528                 /* If we set up but never connected, the
4529                    client import will not have been cleaned. */
4530                 down_write(&obd->u.cli.cl_sem);
4531                 if (obd->u.cli.cl_import) {
4532                         struct obd_import *imp;
4533                         imp = obd->u.cli.cl_import;
4534                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4535                                obd->obd_name);
4536                         ptlrpc_invalidate_import(imp);
4537                         if (imp->imp_rq_pool) {
4538                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4539                                 imp->imp_rq_pool = NULL;
4540                         }
4541                         class_destroy_import(imp);
4542                         obd->u.cli.cl_import = NULL;
4543                 }
4544                 up_write(&obd->u.cli.cl_sem);
4545
4546                 rc = obd_llog_finish(obd, 0);
4547                 if (rc != 0)
4548                         CERROR("failed to cleanup llogging subsystems\n");
4549                 break;
4550         }
4551         case OBD_CLEANUP_SELF_EXP:
4552                 break;
4553         case OBD_CLEANUP_OBD:
4554                 break;
4555         }
4556         RETURN(rc);
4557 }
4558
4559 int osc_cleanup(struct obd_device *obd)
4560 {
4561         int rc;
4562
4563         ENTRY;
4564         ptlrpc_lprocfs_unregister_obd(obd);
4565         lprocfs_obd_cleanup(obd);
4566
4567         /* free memory of osc quota cache */
4568         lquota_cleanup(quota_interface, obd);
4569
4570         cache_destroy(obd->u.cli.cl_cache);
4571         rc = client_obd_cleanup(obd);
4572
4573         ptlrpcd_decref();
4574         RETURN(rc);
4575 }
4576
4577 static int osc_register_page_removal_cb(struct obd_device *obd,
4578                                         obd_page_removal_cb_t func,
4579                                         obd_pin_extent_cb pin_cb)
4580 {
4581         ENTRY;
4582
4583         /* this server - not need init */
4584         if (func == NULL)
4585                 return 0;
4586
4587         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4588                                            pin_cb);
4589 }
4590
4591 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4592                                           obd_page_removal_cb_t func)
4593 {
4594         ENTRY;
4595         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4596 }
4597
4598 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4599                                        obd_lock_cancel_cb cb)
4600 {
4601         ENTRY;
4602         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4603
4604         /* this server - not need init */
4605         if (cb == NULL)
4606                 return 0;
4607
4608         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4609         return 0;
4610 }
4611
4612 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4613                                          obd_lock_cancel_cb cb)
4614 {
4615         ENTRY;
4616
4617         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4618                 CERROR("Unregistering cancel cb %p, while only %p was "
4619                        "registered\n", cb,
4620                        obd->u.cli.cl_ext_lock_cancel_cb);
4621                 RETURN(-EINVAL);
4622         }
4623
4624         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4625         return 0;
4626 }
4627
4628 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4629 {
4630         struct lustre_cfg *lcfg = buf;
4631         struct lprocfs_static_vars lvars = { 0 };
4632         int rc = 0;
4633
4634         lprocfs_osc_init_vars(&lvars);
4635
4636         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4637         return(rc);
4638 }
4639
4640 struct obd_ops osc_obd_ops = {
4641         .o_owner                = THIS_MODULE,
4642         .o_setup                = osc_setup,
4643         .o_precleanup           = osc_precleanup,
4644         .o_cleanup              = osc_cleanup,
4645         .o_add_conn             = client_import_add_conn,
4646         .o_del_conn             = client_import_del_conn,
4647         .o_connect              = client_connect_import,
4648         .o_reconnect            = osc_reconnect,
4649         .o_disconnect           = osc_disconnect,
4650         .o_statfs               = osc_statfs,
4651         .o_statfs_async         = osc_statfs_async,
4652         .o_packmd               = osc_packmd,
4653         .o_unpackmd             = osc_unpackmd,
4654         .o_precreate            = osc_precreate,
4655         .o_create               = osc_create,
4656         .o_create_async         = osc_create_async,
4657         .o_destroy              = osc_destroy,
4658         .o_getattr              = osc_getattr,
4659         .o_getattr_async        = osc_getattr_async,
4660         .o_setattr              = osc_setattr,
4661         .o_setattr_async        = osc_setattr_async,
4662         .o_brw                  = osc_brw,
4663         .o_brw_async            = osc_brw_async,
4664         .o_prep_async_page      = osc_prep_async_page,
4665         .o_get_lock             = osc_get_lock,
4666         .o_queue_async_io       = osc_queue_async_io,
4667         .o_set_async_flags      = osc_set_async_flags,
4668         .o_queue_group_io       = osc_queue_group_io,
4669         .o_trigger_group_io     = osc_trigger_group_io,
4670         .o_teardown_async_page  = osc_teardown_async_page,
4671         .o_punch                = osc_punch,
4672         .o_sync                 = osc_sync,
4673         .o_enqueue              = osc_enqueue,
4674         .o_match                = osc_match,
4675         .o_change_cbdata        = osc_change_cbdata,
4676         .o_find_cbdata          = osc_find_cbdata,
4677         .o_cancel               = osc_cancel,
4678         .o_cancel_unused        = osc_cancel_unused,
4679         .o_join_lru             = osc_join_lru,
4680         .o_iocontrol            = osc_iocontrol,
4681         .o_get_info             = osc_get_info,
4682         .o_set_info_async       = osc_set_info_async,
4683         .o_import_event         = osc_import_event,
4684         .o_llog_init            = osc_llog_init,
4685         .o_llog_finish          = osc_llog_finish,
4686         .o_process_config       = osc_process_config,
4687         .o_register_page_removal_cb = osc_register_page_removal_cb,
4688         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4689         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4690         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4691 };
4692 int __init osc_init(void)
4693 {
4694         struct lprocfs_static_vars lvars = { 0 };
4695         int rc;
4696         ENTRY;
4697
4698         lprocfs_osc_init_vars(&lvars);
4699
4700         request_module("lquota");
4701         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4702         lquota_init(quota_interface);
4703         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4704
4705         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4706                                  LUSTRE_OSC_NAME);
4707         if (rc) {
4708                 if (quota_interface)
4709                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4710                 RETURN(rc);
4711         }
4712
4713         osc_mds_ost_orig_logops = llog_lvfs_ops;
4714         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4715         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4716         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4717         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4718
4719         RETURN(rc);
4720 }
4721
4722 #ifdef __KERNEL__
4723 static void /*__exit*/ osc_exit(void)
4724 {
4725         lquota_exit(quota_interface);
4726         if (quota_interface)
4727                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4728
4729         class_unregister_type(LUSTRE_OSC_NAME);
4730 }
4731
4732 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4733 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4734 MODULE_LICENSE("GPL");
4735
4736 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4737 #endif