Whamcloud - gitweb
LU-1267 lfsck: enhance API for MDT-OST consistency
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "osp_internal.h"
35
36 struct osp_async_update_args {
37         struct update_request   *oaua_update;
38 };
39
40 struct osp_async_update_item {
41         struct list_head                 oaui_list;
42         struct osp_object               *oaui_obj;
43         void                            *oaui_data;
44         osp_async_update_interpterer_t   oaui_interpterer;
45 };
46
47 static struct osp_async_update_item *
48 osp_async_update_item_init(struct osp_object *obj, void *data,
49                            osp_async_update_interpterer_t interpterer)
50 {
51         struct osp_async_update_item *oaui;
52
53         OBD_ALLOC_PTR(oaui);
54         if (oaui == NULL)
55                 return NULL;
56
57         lu_object_get(osp2lu_obj(obj));
58         INIT_LIST_HEAD(&oaui->oaui_list);
59         oaui->oaui_obj = obj;
60         oaui->oaui_data = data;
61         oaui->oaui_interpterer = interpterer;
62
63         return oaui;
64 }
65
66 static void osp_async_update_item_fini(const struct lu_env *env,
67                                        struct osp_async_update_item *oaui)
68 {
69         LASSERT(list_empty(&oaui->oaui_list));
70
71         lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
72         OBD_FREE_PTR(oaui);
73 }
74
75 static int osp_async_update_interpret(const struct lu_env *env,
76                                       struct ptlrpc_request *req,
77                                       void *arg, int rc)
78 {
79         struct update_reply             *reply  = NULL;
80         struct osp_async_update_args    *oaua   = arg;
81         struct update_request           *update = oaua->oaua_update;
82         struct osp_async_update_item    *oaui;
83         struct osp_async_update_item    *next;
84         int                              count  = 0;
85         int                              index  = 0;
86         int                              rc1    = 0;
87
88         if (rc == 0 || req->rq_repmsg != NULL) {
89                 reply = req_capsule_server_sized_get(&req->rq_pill,
90                                                      &RMF_UPDATE_REPLY,
91                                                      UPDATE_BUFFER_SIZE);
92                 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
93                         rc1 = -EPROTO;
94                 else
95                         count = reply->ur_count;
96         } else {
97                 rc1 = rc;
98         }
99
100         list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
101                 list_del_init(&oaui->oaui_list);
102                 if (index < count && reply->ur_lens[index] > 0) {
103                         char *ptr = update_get_buf_internal(reply, index, NULL);
104
105                         LASSERT(ptr != NULL);
106
107                         rc1 = le32_to_cpu(*(int *)ptr);
108                 } else {
109                         rc1 = rc;
110                         if (unlikely(rc1 == 0))
111                                 rc1 = -EINVAL;
112                 }
113
114                 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
115                                        oaui->oaui_data, index, rc1);
116                 osp_async_update_item_fini(env, oaui);
117                 index++;
118         }
119
120         out_destroy_update_req(update);
121
122         return 0;
123 }
124
125 int osp_unplug_async_update(const struct lu_env *env,
126                             struct osp_device *osp,
127                             struct update_request *update)
128 {
129         struct osp_async_update_args    *args;
130         struct ptlrpc_request           *req = NULL;
131         int                              rc;
132
133         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
134                                  update->ur_buf, UPDATE_BUFFER_SIZE, &req);
135         if (rc != 0) {
136                 struct osp_async_update_item *oaui;
137                 struct osp_async_update_item *next;
138
139                 list_for_each_entry_safe(oaui, next,
140                                          &update->ur_cb_items, oaui_list) {
141                         list_del_init(&oaui->oaui_list);
142                         oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
143                                                oaui->oaui_data, 0, rc);
144                         osp_async_update_item_fini(env, oaui);
145                 }
146                 out_destroy_update_req(update);
147         } else {
148                 LASSERT(list_empty(&update->ur_list));
149
150                 args = ptlrpc_req_async_args(req);
151                 args->oaua_update = update;
152                 req->rq_interpret_reply = osp_async_update_interpret;
153                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
154         }
155
156         return rc;
157 }
158
159 /* with osp::opd_async_requests_mutex held */
160 struct update_request *
161 osp_find_or_create_async_update_request(struct osp_device *osp)
162 {
163         struct update_request *update = osp->opd_async_requests;
164
165         if (update != NULL)
166                 return update;
167
168         update = out_create_update_req(&osp->opd_dt_dev);
169         if (!IS_ERR(update))
170                 osp->opd_async_requests = update;
171
172         return update;
173 }
174
175 /* with osp::opd_async_requests_mutex held */
176 int osp_insert_async_update(const struct lu_env *env,
177                             struct update_request *update, int op,
178                             struct osp_object *obj, int count,
179                             int *lens, const char **bufs, void *data,
180                             osp_async_update_interpterer_t interpterer)
181 {
182         struct osp_async_update_item *oaui;
183         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
184         int                           rc  = 0;
185         ENTRY;
186
187         oaui = osp_async_update_item_init(obj, data, interpterer);
188         if (oaui == NULL)
189                 RETURN(-ENOMEM);
190
191 again:
192         rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
193                                count, lens, bufs);
194         if (rc == -E2BIG) {
195                 osp->opd_async_requests = NULL;
196                 mutex_unlock(&osp->opd_async_requests_mutex);
197
198                 rc = osp_unplug_async_update(env, osp, update);
199                 mutex_lock(&osp->opd_async_requests_mutex);
200                 if (rc != 0)
201                         GOTO(out, rc);
202
203                 update = osp_find_or_create_async_update_request(osp);
204                 if (IS_ERR(update))
205                         GOTO(out, rc = PTR_ERR(update));
206
207                 goto again;
208         }
209
210         if (rc == 0)
211                 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
212
213         GOTO(out, rc);
214
215 out:
216         if (rc != 0)
217                 osp_async_update_item_fini(env, oaui);
218
219         return rc;
220 }
221
222 struct thandle *osp_trans_create(const struct lu_env *env,
223                                  struct dt_device *d)
224 {
225         struct thandle *th;
226
227         OBD_ALLOC_PTR(th);
228         if (unlikely(th == NULL))
229                 return ERR_PTR(-ENOMEM);
230
231         th->th_dev = d;
232         th->th_tags = LCT_TX_HANDLE;
233         INIT_LIST_HEAD(&th->th_remote_update_list);
234
235         return th;
236 }
237
238 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
239                              struct thandle *th)
240 {
241         struct update_request   *update = th->th_current_request;
242         int                      rc     = 0;
243
244         if (unlikely(update == NULL || update->ur_buf == NULL ||
245                      update->ur_buf->ub_count == 0))
246                 return 0;
247
248         if (is_remote_trans(th)) {
249                 struct osp_async_update_args    *args;
250                 struct ptlrpc_request           *req;
251
252                 list_del_init(&update->ur_list);
253                 th->th_current_request = NULL;
254                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
255                                          update->ur_buf,
256                                          UPDATE_BUFFER_SIZE, &req);
257                 if (rc == 0) {
258                         args = ptlrpc_req_async_args(req);
259                         args->oaua_update = update;
260                         req->rq_interpret_reply =
261                                 osp_async_update_interpret;
262                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
263                 } else {
264                         out_destroy_update_req(update);
265                 }
266         } else {
267                 th->th_sync = 1;
268                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
269                                      update, NULL);
270         }
271
272         return rc;
273 }
274
275 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
276                     struct thandle *th)
277 {
278         int rc = 0;
279
280         if (!is_remote_trans(th))
281                 rc = osp_trans_trigger(env, dt2osp_dev(dt), th);
282
283         return rc;
284 }
285
286 int osp_trans_stop(const struct lu_env *env, struct thandle *th)
287 {
288         struct update_request   *update = th->th_current_request;
289         int                      rc     = 0;
290
291         if (is_remote_trans(th)) {
292                 LASSERT(update == NULL);
293
294                 update = out_find_update(th, th->th_dev);
295                 th->th_current_request = update;
296                 if (th->th_result == 0)
297                         rc = osp_trans_trigger(env, dt2osp_dev(th->th_dev), th);
298                 else
299                         rc = th->th_result;
300
301                 if (th->th_current_request != NULL)
302                         out_destroy_update_req(update);
303
304                 OBD_FREE_PTR(th);
305         } else {
306                 LASSERT(update != NULL);
307
308                 rc = update->ur_rc;
309                 out_destroy_update_req(update);
310                 th->th_current_request = NULL;
311         }
312
313         return rc;
314 }