Whamcloud - gitweb
dc3481c1108c70121149d20ace9f18c27434d349
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "osp_internal.h"
35
36 struct osp_async_update_args {
37         struct update_request   *oaua_update;
38 };
39
40 struct osp_async_update_item {
41         struct list_head                 oaui_list;
42         struct osp_object               *oaui_obj;
43         void                            *oaui_data;
44         osp_async_update_interpterer_t   oaui_interpterer;
45 };
46
47 static struct osp_async_update_item *
48 osp_async_update_item_init(struct osp_object *obj, void *data,
49                            osp_async_update_interpterer_t interpterer)
50 {
51         struct osp_async_update_item *oaui;
52
53         OBD_ALLOC_PTR(oaui);
54         if (oaui == NULL)
55                 return NULL;
56
57         lu_object_get(osp2lu_obj(obj));
58         INIT_LIST_HEAD(&oaui->oaui_list);
59         oaui->oaui_obj = obj;
60         oaui->oaui_data = data;
61         oaui->oaui_interpterer = interpterer;
62
63         return oaui;
64 }
65
66 static void osp_async_update_item_fini(const struct lu_env *env,
67                                        struct osp_async_update_item *oaui)
68 {
69         LASSERT(list_empty(&oaui->oaui_list));
70
71         lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
72         OBD_FREE_PTR(oaui);
73 }
74
75 static int osp_async_update_interpret(const struct lu_env *env,
76                                       struct ptlrpc_request *req,
77                                       void *arg, int rc)
78 {
79         struct update_reply             *reply  = NULL;
80         struct osp_async_update_args    *oaua   = arg;
81         struct update_request           *update = oaua->oaua_update;
82         struct osp_async_update_item    *oaui;
83         struct osp_async_update_item    *next;
84         int                              count  = 0;
85         int                              index  = 0;
86         int                              rc1    = 0;
87
88         if (rc == 0 || req->rq_repmsg != NULL) {
89                 reply = req_capsule_server_sized_get(&req->rq_pill,
90                                                      &RMF_UPDATE_REPLY,
91                                                      UPDATE_BUFFER_SIZE);
92                 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
93                         rc1 = -EPROTO;
94                 else
95                         count = reply->ur_count;
96         } else {
97                 rc1 = rc;
98         }
99
100         list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
101                 list_del_init(&oaui->oaui_list);
102                 if (index < count && reply->ur_lens[index] > 0) {
103                         char *ptr = update_get_buf_internal(reply, index, NULL);
104
105                         LASSERT(ptr != NULL);
106
107                         rc1 = le32_to_cpu(*(int *)ptr);
108                 } else {
109                         rc1 = rc;
110                         if (unlikely(rc1 == 0))
111                                 rc1 = -EINVAL;
112                 }
113
114                 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
115                                        oaui->oaui_data, index, rc1);
116                 osp_async_update_item_fini(env, oaui);
117                 index++;
118         }
119
120         out_destroy_update_req(update);
121
122         return 0;
123 }
124
125 int osp_unplug_async_update(const struct lu_env *env,
126                             struct osp_device *osp,
127                             struct update_request *update)
128 {
129         struct osp_async_update_args    *args;
130         struct ptlrpc_request           *req = NULL;
131         int                              rc;
132
133         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
134                                  update->ur_buf, UPDATE_BUFFER_SIZE, &req);
135         if (rc != 0) {
136                 struct osp_async_update_item *oaui;
137                 struct osp_async_update_item *next;
138
139                 list_for_each_entry_safe(oaui, next,
140                                          &update->ur_cb_items, oaui_list) {
141                         list_del_init(&oaui->oaui_list);
142                         oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
143                                                oaui->oaui_data, 0, rc);
144                         osp_async_update_item_fini(env, oaui);
145                 }
146                 out_destroy_update_req(update);
147         } else {
148                 LASSERT(list_empty(&update->ur_list));
149
150                 args = ptlrpc_req_async_args(req);
151                 args->oaua_update = update;
152                 req->rq_interpret_reply = osp_async_update_interpret;
153                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
154         }
155
156         return rc;
157 }
158
159 /* with osp::opd_async_requests_mutex held */
160 struct update_request *
161 osp_find_or_create_async_update_request(struct osp_device *osp)
162 {
163         struct update_request *update = osp->opd_async_requests;
164
165         if (update != NULL)
166                 return update;
167
168         update = out_create_update_req(&osp->opd_dt_dev);
169         if (!IS_ERR(update))
170                 osp->opd_async_requests = update;
171
172         return update;
173 }
174
175 /* with osp::opd_async_requests_mutex held */
176 int osp_insert_async_update(const struct lu_env *env,
177                             struct update_request *update, int op,
178                             struct osp_object *obj, int count,
179                             int *lens, const char **bufs, void *data,
180                             osp_async_update_interpterer_t interpterer)
181 {
182         struct osp_async_update_item *oaui;
183         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
184         int                           rc  = 0;
185         ENTRY;
186
187         oaui = osp_async_update_item_init(obj, data, interpterer);
188         if (oaui == NULL)
189                 RETURN(-ENOMEM);
190
191 again:
192         rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
193                                count, lens, bufs);
194         if (rc == -E2BIG) {
195                 osp->opd_async_requests = NULL;
196                 mutex_unlock(&osp->opd_async_requests_mutex);
197
198                 rc = osp_unplug_async_update(env, osp, update);
199                 mutex_lock(&osp->opd_async_requests_mutex);
200                 if (rc != 0)
201                         GOTO(out, rc);
202
203                 update = osp_find_or_create_async_update_request(osp);
204                 if (IS_ERR(update))
205                         GOTO(out, rc = PTR_ERR(update));
206
207                 goto again;
208         }
209
210         if (rc == 0)
211                 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
212
213         GOTO(out, rc);
214
215 out:
216         if (rc != 0)
217                 osp_async_update_item_fini(env, oaui);
218
219         return rc;
220 }
221
222 /**
223  * If the transaction creation goes to OSP, it means the update
224  * in this transaction only includes remote UPDATE. It is only
225  * used by LFSCK right now.
226  **/
227 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
228 {
229         struct thandle *th = NULL;
230         struct thandle_update *tu = NULL;
231         int rc = 0;
232
233         OBD_ALLOC_PTR(th);
234         if (unlikely(th == NULL))
235                 GOTO(out, rc = -ENOMEM);
236
237         th->th_dev = d;
238         th->th_tags = LCT_TX_HANDLE;
239         atomic_set(&th->th_refc, 1);
240         th->th_alloc_size = sizeof(*th);
241
242         OBD_ALLOC_PTR(tu);
243         if (tu == NULL)
244                 GOTO(out, rc = -ENOMEM);
245
246         INIT_LIST_HEAD(&tu->tu_remote_update_list);
247         tu->tu_only_remote_trans = 1;
248         th->th_update = tu;
249
250 out:
251         if (rc != 0) {
252                 if (tu != NULL)
253                         OBD_FREE_PTR(tu);
254                 if (th != NULL)
255                         OBD_FREE_PTR(th);
256                 th = ERR_PTR(rc);
257         }
258
259         return th;
260 }
261
262 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
263                              struct update_request *update, struct thandle *th)
264 {
265         struct thandle_update   *tu = th->th_update;
266         int                     rc = 0;
267
268         LASSERT(tu != NULL);
269
270         /* If the transaction only includes remote update, it should
271          * still be asynchronous */
272         if (is_only_remote_trans(th)) {
273                 struct osp_async_update_args    *args;
274                 struct ptlrpc_request           *req;
275
276                 list_del_init(&update->ur_list);
277                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
278                                          update->ur_buf,
279                                          UPDATE_BUFFER_SIZE, &req);
280                 if (rc == 0) {
281                         args = ptlrpc_req_async_args(req);
282                         args->oaua_update = update;
283                         req->rq_interpret_reply =
284                                 osp_async_update_interpret;
285                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
286                 } else {
287                         out_destroy_update_req(update);
288                 }
289         } else {
290                 /* Before we support async update, the cross MDT transaction
291                  * has to been synchronized */
292                 th->th_sync = 1;
293                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
294                                      update, NULL);
295         }
296
297         return rc;
298 }
299
300 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
301                     struct thandle *th)
302 {
303         struct thandle_update *tu = th->th_update;
304         struct update_request *update;
305         int rc = 0;
306
307         if (tu == NULL)
308                 return rc;
309
310         /* Check whether there are updates related with this OSP */
311         update = out_find_update(tu, dt);
312         if (update == NULL)
313                 return rc;
314
315         /* Note: some updates needs to send before local transaction,
316          * some needs to send after local transaction.
317          *
318          * If the transaction only includes remote updates, it will
319          * send updates to remote MDT in osp_trans_stop.
320          *
321          * If it is remote create, it will send the remote req after
322          * local transaction. i.e. create the object locally first,
323          * then insert the name entry.
324          *
325          * If it is remote unlink, it will send the remote req before
326          * the local transaction, i.e. delete the name entry remote
327          * first, then destroy the local object. */
328         if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
329                 rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
330
331         return rc;
332 }
333
334 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
335                    struct thandle *th)
336 {
337         struct thandle_update   *tu = th->th_update;
338         struct update_request   *update;
339         int rc = 0;
340
341         LASSERT(tu != NULL);
342         /* Check whether there are updates related with this OSP */
343         update = out_find_update(tu, dt);
344         if (update == NULL) {
345                 if (!is_only_remote_trans(th))
346                         return rc;
347                 goto put;
348         }
349
350         if (update->ur_buf->ub_count == 0) {
351                 out_destroy_update_req(update);
352                 goto put;
353         }
354
355         if (is_only_remote_trans(th)) {
356                 if (th->th_result == 0) {
357                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
358                                                update, th);
359                 } else {
360                         rc = th->th_result;
361                         out_destroy_update_req(update);
362                 }
363         } else {
364                 if (tu->tu_sent_after_local_trans)
365                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
366                                                update, th);
367                 rc = update->ur_rc;
368                 out_destroy_update_req(update);
369         }
370
371 put:
372         thandle_put(th);
373         return rc;
374 }