/*
 * Copyright 2022 Amazon.com, Inc. or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package software.amazon.lambda.powertools.utilities;

import com.amazonaws.services.lambda.runtime.events.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
import static software.amazon.lambda.powertools.utilities.jmespath.Base64Function.decode;
import static software.amazon.lambda.powertools.utilities.jmespath.Base64GZipFunction.decompress;

/**
 * Class that can be used to extract the meaningful part of an event and deserialize it into a Java object.<br/>
 * For example, extract the body of an API Gateway event, or messages from an SQS event.
 */
public class EventDeserializer {

    private static final Logger LOG = LoggerFactory.getLogger(EventDeserializer.class);

    /**
     * Extract the meaningful part of a Lambda Event object. Main events are built-in:
     * <ul>
     *     <li>{@link APIGatewayProxyRequestEvent}      -> body</li>
     *     <li>{@link APIGatewayV2HTTPEvent}            -> body</li>
     *     <li>{@link SNSEvent}                         -> Records[0].Sns.Message</li>
     *     <li>{@link SQSEvent}                         -> Records[*].body <i>(list)</i></li>
     *     <li>{@link ScheduledEvent}                   -> detail</li>
     *     <li>{@link ApplicationLoadBalancerRequestEvent} -> body</li>
     *     <li>{@link CloudWatchLogsEvent}              -> powertools_base64_gzip(data)</li>
     *     <li>{@link CloudFormationCustomResourceEvent} -> resourceProperties</li>
     *     <li>{@link KinesisEvent}                     -> Records[*].kinesis.powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link KinesisFirehoseEvent}             -> Records[*].powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link KafkaEvent}                       -> records[*].values[*].powertools_base64(value) <i>(list)</i></li>
     *     <li>{@link ActiveMQEvent}                    -> messages[*].powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link RabbitMQEvent}                    -> rmqMessagesByQueue[*].values[*].powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link KinesisAnalyticsFirehoseInputPreprocessingEvent} -> Records[*].kinesis.powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link KinesisAnalyticsStreamsInputPreprocessingEvent} > Records[*].kinesis.powertools_base64(data) <i>(list)</i></li>
     *     <li>{@link String}</li>
     *     <li>{@link Map}</li>
     * </ul>
     * To be used in conjunction with {@link EventPart#as(Class)} or {@link EventPart#asListOf(Class)}
     * for the deserialization.
     *
     * @param object the event of your Lambda function handler method
     * @return the part of the event which is meaningful (ex: body of the API Gateway).<br/>
     */
    public static EventPart extractDataFrom(Object object) {
        if (object instanceof String) {
            return new EventPart((String) object);
        } else if (object instanceof Map) {
            return new EventPart((Map<String, Object>) object);
        } else if (object instanceof APIGatewayProxyRequestEvent) {
            APIGatewayProxyRequestEvent event = (APIGatewayProxyRequestEvent) object;
            return new EventPart(event.getBody());
        } else if (object instanceof APIGatewayV2HTTPEvent) {
            APIGatewayV2HTTPEvent event = (APIGatewayV2HTTPEvent) object;
            return new EventPart(event.getBody());
        } else if (object instanceof SNSEvent) {
            SNSEvent event = (SNSEvent) object;
            return new EventPart(event.getRecords().get(0).getSNS().getMessage());
        } else if (object instanceof SQSEvent) {
            SQSEvent event = (SQSEvent) object;
            return new EventPart(event.getRecords().stream()
                    .map(SQSEvent.SQSMessage::getBody)
                    .collect(Collectors.toList()));
        } else if (object instanceof ScheduledEvent) {
            ScheduledEvent event = (ScheduledEvent) object;
            return new EventPart(event.getDetail());
        } else if (object instanceof ApplicationLoadBalancerRequestEvent) {
            ApplicationLoadBalancerRequestEvent event = (ApplicationLoadBalancerRequestEvent) object;
            return new EventPart(event.getBody());
        } else if (object instanceof CloudWatchLogsEvent) {
            CloudWatchLogsEvent event = (CloudWatchLogsEvent) object;
            return new EventPart(decompress(decode(event.getAwsLogs().getData().getBytes(UTF_8))));
        } else if (object instanceof CloudFormationCustomResourceEvent) {
            CloudFormationCustomResourceEvent event = (CloudFormationCustomResourceEvent) object;
            return new EventPart(event.getResourceProperties());
        } else if (object instanceof KinesisEvent) {
            KinesisEvent event = (KinesisEvent) object;
            return new EventPart(event.getRecords().stream()
                    .map(r -> decode(r.getKinesis().getData()))
                    .collect(Collectors.toList()));
        } else if (object instanceof KinesisFirehoseEvent) {
            KinesisFirehoseEvent event = (KinesisFirehoseEvent) object;
            return new EventPart(event.getRecords().stream()
                    .map(r -> decode(r.getData()))
                    .collect(Collectors.toList()));
        } else if (object instanceof KafkaEvent) {
            KafkaEvent event = (KafkaEvent) object;
            return new EventPart(event.getRecords().values().stream()
                    .flatMap(List::stream)
                    .map(r -> decode(r.getValue()))
                    .collect(Collectors.toList()));
        } else if (object instanceof ActiveMQEvent) {
            ActiveMQEvent event = (ActiveMQEvent) object;
            return new EventPart(event.getMessages().stream()
                    .map(m -> decode(m.getData()))
                    .collect(Collectors.toList()));
        } else if (object instanceof RabbitMQEvent) {
            RabbitMQEvent event = (RabbitMQEvent) object;
            return new EventPart(event.getRmqMessagesByQueue().values().stream()
                    .flatMap(List::stream)
                    .map(r -> decode(r.getData()))
                    .collect(Collectors.toList()));
        } else if (object instanceof KinesisAnalyticsFirehoseInputPreprocessingEvent) {
            KinesisAnalyticsFirehoseInputPreprocessingEvent event = (KinesisAnalyticsFirehoseInputPreprocessingEvent) object;
            return new EventPart(event.getRecords().stream()
                    .map(r -> decode(r.getData()))
                    .collect(Collectors.toList()));
        } else if (object instanceof KinesisAnalyticsStreamsInputPreprocessingEvent) {
            KinesisAnalyticsStreamsInputPreprocessingEvent event = (KinesisAnalyticsStreamsInputPreprocessingEvent) object;
            return new EventPart(event.getRecords().stream()
                    .map(r -> decode(r.getData()))
                    .collect(Collectors.toList()));
        } else {
            // does not really make sense to use this EventDeserializer when you already have a typed object
            // just not to throw an exception
            LOG.warn("Consider using your object directly instead of using EventDeserializer");
            return new EventPart(object);
        }
    }

    /**
     * Meaningful part of a Lambda event.<br/>
     * Use {@link #extractDataFrom(Object)} to retrieve an instance of this class.
     */
    public static class EventPart {
        private Map<String, Object> contentMap;
        private String content;
        private List<String> contentList;
        private Object contentObject;

        private EventPart(List<String> contentList) {
            this.contentList = contentList;
        }

        private EventPart(String content) {
            this.content = content;
        }

        private EventPart(Map<String, Object> contentMap) {
            this.contentMap = contentMap;
        }

        private EventPart(Object content) {
            this.contentObject = content;
        }

        /**
         * Deserialize this part of event from JSON to an object of type T
         * @param clazz the target type for deserialization
         * @param <T> type of object to return
         * @return an Object of type T (deserialized from the content)
         */
        public <T> T as(Class<T> clazz) {
            try {
                if (content != null) {
                    if (content.getClass().equals(clazz)) {
                        // do not read json when returning String, just return the String
                        return (T) content;
                    }
                    return JsonConfig.get().getObjectMapper().reader().readValue(content, clazz);
                }
                if (contentMap != null) {
                    return JsonConfig.get().getObjectMapper().convertValue(contentMap, clazz);
                }
                if (contentObject != null) {
                    return (T) contentObject;
                }
                if (contentList != null) {
                    throw new EventDeserializationException("The content of this event is a list, consider using 'asListOf' instead");
                }
                // should not occur, except if the event is malformed (missing fields)
                throw new IllegalStateException("Event content is null");
            } catch (IOException e) {
                throw new EventDeserializationException("Cannot load the event as " + clazz.getSimpleName(), e);
            }
        }

        /**
         * Deserialize this part of event from JSON to a list of objects of type T
         * @param clazz the target type for deserialization
         * @param <T> type of object to return
         * @return a list of objects of type T (deserialized from the content)
         */
        public <T> List<T> asListOf(Class<T> clazz) {
            if (contentList == null) {
                if (content != null || contentMap != null || contentObject != null) {
                    throw new EventDeserializationException("The content of this event is not a list, consider using 'as' instead");
                }
                // should not occur, except if the event is really malformed
                throw new IllegalStateException("Event content is null");
            }
            return contentList.stream().map(s -> {
                try {
                    return s == null ? null : JsonConfig.get().getObjectMapper().reader().readValue(s, clazz);
                } catch (IOException e) {
                    throw new EventDeserializationException("Cannot load the event as " + clazz.getSimpleName(), e);
                }
            }).collect(Collectors.toList());
        }
    }
}