diff --git a/.github/workflows/create-prerelease.yml b/.github/workflows/create-prerelease.yml index c036ff2..68d1f94 100644 --- a/.github/workflows/create-prerelease.yml +++ b/.github/workflows/create-prerelease.yml @@ -31,7 +31,7 @@ jobs: run: | echo "::set-output name=version::$(date +'%Y-%m-%d' --utc)" - - name: Create Stable Release + name: Create Pre-Release id: create_release uses: actions/create-release@v1 env: diff --git a/Dockerfile b/Dockerfile index d96fb27..61df7eb 100755 --- a/Dockerfile +++ b/Dockerfile @@ -3,17 +3,22 @@ FROM ubuntu:20.04 USER root WORKDIR /root +# RUN alias apt_install="DEBIAN_FRONTEND=noninteractive apt update -y && DEBIAN_FRONTEND=noninteractive apt install -y" +# RUN alias apt_clean="apt clean && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*" + +COPY --chmod=777 apt_install /apt_install + # install utility software packages -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y software-properties-common&& rm -rf /var/cache/apt/archives /var/lib/apt/lists/* -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y inetutils-ping net-tools wget && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y htop screen zip nano && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* +RUN /apt_install software-properties-common +RUN /apt_install inetutils-ping net-tools wget +RUN /apt_install htop screen zip nano # install and configure git -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y git && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* +RUN /apt_install git RUN DEBIAN_FRONTEND=noninteractive git config --global commit.gpgsign false # configure ssh daemon -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y openssh-server && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* +RUN /apt_install openssh-server RUN if ! [ -d /var/run/sshd ]; then mkdir /var/run/sshd; fi RUN echo 'root:password!!' | chpasswd RUN sed -i 's/^[# ]*PermitRootLogin .*$/PermitRootLogin yes/g' /etc/ssh/sshd_config @@ -41,9 +46,9 @@ RUN echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/' >> ~/.bashrc RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y python3 python3-pip python3-dev && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* # install PyCOMPSs -RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y graphviz xdg-utils libtool automake build-essential \ +RUN /apt_install graphviz xdg-utils libtool automake build-essential \ python python-dev libpython2.7 libboost-serialization-dev libboost-iostreams-dev libxml2 libxml2-dev csh gfortran \ - libgmp3-dev flex bison texinfo libpapi-dev && rm -rf /var/cache/apt/archives /var/lib/apt/lists/* + libgmp3-dev flex bison texinfo libpapi-dev RUN python3 -m pip install --upgrade pip setuptools RUN python3 -m pip install dill guppy3 RUN python3 -m pip install "pycompss==3.1" -v diff --git a/apt_install b/apt_install new file mode 100644 index 0000000..3fd2097 --- /dev/null +++ b/apt_install @@ -0,0 +1,13 @@ +#!/bin/bash + +# script used for installing software through apt in Dockerfiles, avoiding layer cache and size problems + +# update packages lists +DEBIAN_FRONTEND=noninteractive apt update -y + +# install required software +DEBIAN_FRONTEND=noninteractive apt install -y $@ + +# clean apt cache and lists +DEBIAN_FRONTEND=noninteractive apt clean +rm -rf /var/cache/apt/archives /var/lib/apt/lists/* \ No newline at end of file diff --git a/parsoda/function/analysis/parallel_fp_growth.py b/parsoda/function/analysis/parallel_fp_growth.py deleted file mode 100644 index fb788ef..0000000 --- a/parsoda/function/analysis/parallel_fp_growth.py +++ /dev/null @@ -1,57 +0,0 @@ -import pandas -from pycompss.dds import DDS -#from ddf_library.ddf import DDF -#from ddf_library.functions.ml.fpm import FPGrowth, AssociationRules - -from parsoda_costantino.common.abstract_analysis_function import AbstractAnalysisFunction - -class ParallelFPGrowth(AbstractAnalysisFunction): - - def __init__(self): - AbstractAnalysisFunction.__init__(self) - self.min_support = None - self.association_rules = None - self.ass_rules_min_confidence = 0.2 #default - - def set_options(self, options): - if options is not None: - self.min_support = float(options.get_option("min_support")) - self.association_rules = options.get_option("association_rules") - self.ass_rules_min_confidence = float(options.get_option("ass_rules_min_confidence")) - - def run(self, driver, data): - prepared_data = prepare_data(data) - data_frame = pandas.DataFrame({"sequences": prepared_data}) - data_set = DDS().parallelize(data_frame) - fp = FPGrowth('sequences', self.min_support) - fp.run(data_set) - result = fp.get_frequent_itemsets().toDF().values.tolist() - lenD = len(prepared_data) - freq_itms = [] - for poi in result: - support_poi = float("{0:.4f}".format(poi[1]/float(lenD))) - freq_itms.append((poi[0],support_poi)) - - rules = None - if self.association_rules == "yes": - rules = AssociationRules(confidence=self.ass_rules_min_confidence).run(data_set) - - return freq_itms, rules - - - -def prepare_data(data): - new_data = [] - for a in data: - d = a[1] # type: dict - new_data.extend(list(d.values())) - prepared_data = [] - for a in new_data: - result = [] - i = 1 - for e in a: - if e not in result: - result.append(e) - i += 1 - prepared_data.append(result) - return prepared_data diff --git a/parsoda/function/analysis/sequential_fp_growth.py b/parsoda/function/analysis/sequential_fp_growth.py deleted file mode 100644 index a9a2a86..0000000 --- a/parsoda/function/analysis/sequential_fp_growth.py +++ /dev/null @@ -1,38 +0,0 @@ -# coding=utf-8 -from pyfpgrowth import pyfpgrowth - -from parsoda.model import Analyzer - - -class SequentialFPGrowth(Analyzer): - - def __init__(self, min_support: float, association_rules: dict, association_rules_min_confidence: float = 0.2): - self.min_support = min_support - self.association_rules = association_rules - self.ass_rules_min_confidence = association_rules_min_confidence # default - - def analyze(self, driver, data): - """ - l'input è un insieme di traiettorie, l'output è una tupla formata da (l'insieme di itemset con relativo - supporto e le regole associative ottenute, nel caso il parametro association_rule sia diverso da 'yes' - viene ritornata una tupla il cui secondo elemento è nullo - """ - # prepare data - prepared_data = [] - for a in data: - d = a[1] # type: dict - prepared_data.extend(list(d.values())) - - # run the algorithm - support = int(len(prepared_data) * self.min_support) - patterns = pyfpgrowth.find_frequent_patterns(prepared_data, support) - lenD = len(prepared_data) - freq_itms = {} - for poi in list(patterns.keys()): - support_poi = float("{0:.4f}".format(patterns[poi] / float(lenD))) - freq_itms[poi] = support_poi - - rules = None - if self.association_rules == "yes": - rules = pyfpgrowth.generate_association_rules(patterns, self.ass_rules_min_confidence) - return freq_itms, rules diff --git a/parsoda/model/driver/parsoda_driver.py b/parsoda/model/driver/parsoda_driver.py index 89d6e1e..af922a8 100644 --- a/parsoda/model/driver/parsoda_driver.py +++ b/parsoda/model/driver/parsoda_driver.py @@ -30,7 +30,6 @@ def init_environment(self) -> None: def set_num_partitions(self, num_partitions: int) -> None: """ Sets the number of data partitions - :return: None """ pass @@ -38,7 +37,6 @@ def set_num_partitions(self, num_partitions: int) -> None: def set_chunk_size(self, chunk_size: int) -> None: """ Sets the size of data partitions in bytes - :return: None """ pass @@ -55,7 +53,6 @@ def crawl(self, crawler: List[Crawler]) -> None: After invoking this function the implementor should hold a representation of an initial dataset (e.g., on Spark a new RDD is populated with the SocialDataItem objects provided by crawlers) - :return: None """ pass @@ -63,8 +60,9 @@ def crawl(self, crawler: List[Crawler]) -> None: def filter(self, filter_func: Callable[[Any], bool]) -> None: """ Applies the given filter to the current dataset, dropping all items that does not satisfy the filter - :param filter_func: the filter to apply - :return: None + + Args: + filter_func: the filter to apply """ pass @@ -72,36 +70,43 @@ def filter(self, filter_func: Callable[[Any], bool]) -> None: def flatmap(self, mapper: Callable[[Any], Iterable[Any]]) -> None: """ Executes a mapping of each item to a list of custom key-value pairs, represented as tuples of two elements each - :param mapper: the (object -> list[(K,V)]) mapping function to apply - :return: None + + Args: + mapper: the (object -> list[(K,V)]) mapping function to apply """ pass def map(self, mapper: Callable[[Any], Any]) -> None: """ - Executes a mapping of each item in the current dataset to a new object - :param mapper: the (object -> list[(K,V)]) mapping function to apply - :return: None + Executes a mapping of each item in the current dataset to a new object. + + Args: + mapper: the (object -> list[(K,V)]) mapping function to apply """ self.flatmap(_flatmapper(mapper)) #TODO: documentation def group_by_key(self) -> None: - """Assumes that the current dataset is a bulk of key-value pairs and creates a new dataset which groups all the items with the same key. The new dataset will be a bulk of (key)-(list-of-values) pairs. + """Assumes that the current dataset is a bulk of key-value pairs + and creates a new dataset which groups all the items with the same key. + The new dataset will be a bulk of (key)-(list-of-values) pairs. """ pass def get_result(self) -> Any: """ Gets the current dataset - :return: the current dataset + + Returns: + Any: the current dataset """ pass @abstractmethod def dispose_environment(self) -> None: """ - Disposes instantiated resources of the underlying environment, after executing the ParSoDA application, in order to reuse this driver as a new fresh driver that should be re-initialized - :return: None + Disposes instantiated resources of the underlying environment, + after executing the ParSoDA application, in order to reuse + this driver as a new fresh driver that should be re-initialized """ pass diff --git a/parsoda/model/function/analyzer.py b/parsoda/model/function/analyzer.py index 40ff6cc..7675060 100644 --- a/parsoda/model/function/analyzer.py +++ b/parsoda/model/function/analyzer.py @@ -15,4 +15,15 @@ class Analyzer(ABC, Generic[K, R, A]): @abstractmethod def analyze(self, driver: ParsodaDriver, data: Dict[K, R]) -> A: + """Applies an analysis algorithm to the output data from reduction step. + The analyzer might be a sequential, parallel or distributed algorithm. + In the latter case, the algorithm would use the same driver used by the current application for running a new, nested, ParSoDA application. + + Args: + driver (ParsodaDriver): the driver used during the execution of the parallel phase + data (Dict[K, R]): output data from reducton step organized as a dictionary of key-value pairs + + Returns: + A: the outputdata type from the analysis + """ pass \ No newline at end of file diff --git a/parsoda/model/function/crawler.py b/parsoda/model/function/crawler.py index 5b4bcd6..1824bd4 100644 --- a/parsoda/model/function/crawler.py +++ b/parsoda/model/function/crawler.py @@ -64,6 +64,11 @@ def get_partitions(self, num_of_partitions=0, partition_size=1024*1024*1024) -> @abstractmethod def supports_remote_partitioning(self) -> bool: + """Checks if the crawler supports remote partitioning, i.e. the ability to read data directly from the worker nodes + + Returns: + bool: true if the crawler supports remote partitionig of data source. + """ pass diff --git a/parsoda/model/function/filter.py b/parsoda/model/function/filter.py index 6cf2cad..d528870 100644 --- a/parsoda/model/function/filter.py +++ b/parsoda/model/function/filter.py @@ -10,9 +10,12 @@ class Filter(ABC): @abstractmethod def test(self, item: SocialDataItem) -> bool: - """ - Test if the item satisfies the predicate of the filter - :param item: the item to test - :return: True if the item satisfies the predicate, False otherwise - """ + """Test if the item satisfies the predicate of the filter + + Args: + item (SocialDataItem): the item to test + + Returns: + bool: True if the item satisfies the predicate, False otherwise + """ pass \ No newline at end of file diff --git a/parsoda/model/function/mapper.py b/parsoda/model/function/mapper.py index 0f2fc37..b602831 100644 --- a/parsoda/model/function/mapper.py +++ b/parsoda/model/function/mapper.py @@ -14,10 +14,13 @@ class Mapper(ABC, Generic[K, V]): @abstractmethod def map(self, item: SocialDataItem) -> Iterable[Tuple[K, V]]: - """ - Returns a list of key-value pairs computed from the given item. + """Returns a list of key-value pairs computed from the given item. Example result: [ (item.user_id, item.tags[0]), (item.user_id, item.tags[1]), ... ] - :param item: the item to map - :return: a list of key-value pairs + + Args: + item (SocialDataItem): the item to map + + Returns: + Iterable[Tuple[K, V]]: an iterable of key-value pairs """ pass diff --git a/parsoda/model/function/reducer.py b/parsoda/model/function/reducer.py index 62f98af..4f3ef7d 100644 --- a/parsoda/model/function/reducer.py +++ b/parsoda/model/function/reducer.py @@ -12,10 +12,13 @@ class Reducer(ABC, Generic[K, V, R]): """ def reduce(self, key: K, values: List[V]) -> R: - """ - Applies the reduction algorithm to values - :param key: the key all values are associated to - :param values: all the values associated to the key - :return: the reduced value + """Applies the reduction algorithm to values + + Args: + key (K): the key all values are associated to + values (List[V]): all the values associated to the key + + Returns: + R: the reduced value """ pass \ No newline at end of file diff --git a/parsoda/model/function/visualizer.py b/parsoda/model/function/visualizer.py index d173198..30e2fbe 100644 --- a/parsoda/model/function/visualizer.py +++ b/parsoda/model/function/visualizer.py @@ -11,4 +11,9 @@ class Visualizer(ABC, Generic[A]): @abstractmethod def visualize(self, result: A) -> None: + """Transforms data from the analysis step in some output format, then write them to some output device or system. + + Args: + result (A): the data resulting from the analysis step + """ pass diff --git a/parsoda/model/social_data_app.py b/parsoda/model/social_data_app.py index 55fefdd..4b748b0 100644 --- a/parsoda/model/social_data_app.py +++ b/parsoda/model/social_data_app.py @@ -44,49 +44,124 @@ def __init__( self.__reduce_result_length = reduce_result_length - def get_app_name(self): + def get_app_name(self)->str: + """Gets the referred application name + + Returns: + str: the app name + """ return self.__app_name - def get_driver(self): + def get_driver(self)->ParsodaDriver: + """Gets the driver used by the application + + Returns: + ParsodaDriver: the driver object + """ return self.__driver - def get_partitions(self): + def get_partitions(self)->int: + """Gets the number of partitions used during execution + + Returns: + int: number of partitions + """ return self.__partitions - def get_chunk_size(self): + def get_chunk_size(self)->int: + """Gets the data chunk size, i.e. the partitoin size, used during execution + + Returns: + int: data chunck size + """ return self.__chunk_size - def get_crawling_time(self): + def get_crawling_time(self)->float: + """Gets the time spent on crawling + + Returns: + float: the crawling time in seconds + """ return self.__crawling_time - def get_filter_time(self): + def get_filter_time(self)->float: + """Gets the time spent on filtering + + Returns: + float: the filter time in seconds + """ return self.__filter_time - def get_map_time(self): + def get_map_time(self)->float: + """Gets the time spent on mapping + + Returns: + float: the map time in seconds + """ return self.__map_time - def get_split_time(self): + def get_split_time(self)->float: + """Gets the time spent on splitting + + Returns: + float: the split time in seconds + """ return self.__split_time - def get_reduce_time(self): + def get_reduce_time(self)->float: + """Gets the time spent on reduction + + Returns: + float: the reduce time in seconds + """ return self.__reduce_time - def get_analysis_time(self): + def get_analysis_time(self)->float: + """Gets the time spent on analysis + + Returns: + float: the analysis time in seconds + """ return self.__analysis_time - def get_visualization_time(self): + def get_visualization_time(self)->float: + """Gets the time spent on visualization + + Returns: + float: the visualization time in seconds + """ return self.__visualization_time - def get_total_execution_time(self): + def get_parallel_execution_time(self)->float: + """Gets the time spent on parallel execution, i.e. the time spent from filtering to reduction. + + Returns: + float: the parallel execution time + """ return self.__filter_to_reduce_time - def get_total_execution_time(self): + def get_total_execution_time(self)->float: + """Gets the time spent on execution, from filtering to visualization, excluding the crawling step + + Returns: + float: the total execution time in seconds + """ return self.__total_execution_time def get_total_time(self): + """Gets the total time spent for completing the application, from crawling to visualization, i.e. the response time + + Returns: + float: the total response time + """ return self.__total_time def get_reduce_result_length(self): + """Gets the number of items obtained by executing the reduction. This value can be used for debbugging purposes and for testing the correctness of the parallel execution. + + Returns: + float: the length of the reduction result + """ return self.__reduce_result_length def __repr__(self): @@ -117,6 +192,14 @@ def __str__(self): "| Reduce result length: " + str(self.__reduce_result_length) + "\n" def to_csv_line(self, separator: str = ";") -> str: + """Creates a CSV (Comma Separated Value) line for this report, by using the specified separator + + Args: + separator (str, optional): The values separator. Defaults to ";". + + Returns: + str: the CSV line + """ return \ str(self.__app_name)+separator+\ str(self.__partitions)+separator+\ @@ -134,6 +217,16 @@ def to_csv_line(self, separator: str = ";") -> str: str(self.__reduce_result_length) def to_csv_titles(self, separator: str = ";") -> str: + """Creates a CSV (Comma Separated Value) header line, by using the specified separator. + The returned line contains just the standard titles of report columns. + It can be used for writing the first header line of a CSV file that would store more than one execution report. + + Args: + separator (str, optional): The values separator. Defaults to ";". + + Returns: + str: the columns titles in a CSV line + """ return \ "App Name"+separator+\ "Partitions"+separator+\ @@ -208,6 +301,12 @@ def __init__(self, app_name: str, driver: ParsodaDriver, num_partitions=None, ch def set_num_partitions(self, num_partitions: int): """ Sets the number of partitions. This is overriden by the chunk size if it is set. + + Args: + num_partitions (int): The wanted partitions number per crawler + + Returns: + SocialDataApp: this SocialDataApp instance """ self.__num_partitions = num_partitions return self @@ -215,14 +314,41 @@ def set_num_partitions(self, num_partitions: int): def set_chunk_size(self, chunk_size: int): """ Sets the data chunk size in megabytes. This parameter overrides the number of partitions. + + Args: + chunk_size (int): The wanted partition size + + Returns: + SocialDataApp: this SocialDataApp instance """ self.__chunk_size = chunk_size return self def set_report_file(self, filename: str): + """ + Sets the file name of the ParSoDA report + + Args: + filename (str): The file name where the report will be saved + + Returns: + SocialDataApp: this SocialDataApp instance + """ self.__report_file = filename + return self def set_crawlers(self, crawlers: List[Crawler]): + """Sets the list of crawlers to be used for loading data + + Args: + crawlers (List[Crawler]): A list of Crawler objects + + Raises: + Exception: if no crawler is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if crawlers is None or len(crawlers) == 0: raise Exception("No crawler given") self.__crawlers = [] @@ -231,6 +357,17 @@ def set_crawlers(self, crawlers: List[Crawler]): return self def set_filters(self, filters: List[Filter]): + """Sets the filters to be applied to data + + Args: + filters (List[Filter]): the list of filters + + Raises: + Exception: if no filter is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if filters is None or len(filters) == 0: raise Exception("No filter given") self.__filters = [] @@ -239,38 +376,101 @@ def set_filters(self, filters: List[Filter]): return self def set_mapper(self, mapper: Mapper[K, V]): + """Sets the mapper to be used in Map-Reduce step + + Args: + mapper (Mapper[K, V]): the mapper + + Raises: + Exception: if no mapper is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if mapper is None: raise Exception("No mapper given") self.__mapper = mapper return self def set_secondary_sort_key(self, key_function: Callable[[V], SORTABLE_KEY]): + """Sets a key-function to be used for secondary sort step. If no key-function is set, the secondary sort will not be executed. + A key object returned by the specified key-function must be a sortable object, i.e. an object that can be compared to other objects of the same type. + + Args: + key_function (Callable[[V], SORTABLE_KEY]): a callable object which maps an item to a key. + + Raises: + Exception: if no key-function is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if key_function is None: raise Exception("No key function given") self.__secondary_sort_key_function = key_function return self def set_reducer(self, reducer: Reducer[K, V, R]): + """Sets the reducer to be used in Map-Reduce step. + + Args: + reducer (Reducer[K, V, R]): the reducer + + Raises: + Exception: if no reducer is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if reducer is None: raise Exception("No reducer given") self.__reducer = reducer return self def set_analyzer(self, analyzer: Analyzer[K, R, A]): + """Sets an optional analysis function. + This could be a function that creates a new SocialDataApp instance and could use the same driver given in the current one. + + Args: + analyzer (Analyzer[K, R, A]): the analyzer + + Raises: + Exception: if no analyzer is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if analyzer is None: raise Exception("No analyzer given") self.__analyzer = analyzer return self def set_visualizer(self, visualizer: Visualizer[A]): + """Set an optional function for the visualization step (e.g., a function that writes results to a file) + + Args: + visualizer (Visualizer[A]): the visualizer + + Raises: + Exception: if no analyzer is given + + Returns: + SocialDataApp: this SocialDataApp instance + """ if visualizer is None: raise Exception("No visualizer given") self.__visualizer = visualizer return self def execute(self) -> ParsodaReport: - #locale.setlocale(locale.LC_ALL, "en_US.utf8") + """Runs the application and returns a report about its execution. + Raises: + Exception: if some preliminary check fails (e.g. no crawlers are set) or some ParSoDA step fails during the execution. + + Returns: + ParsodaReport: the execution report + """ # Check application components if self.__crawlers is None or len(self.__crawlers) == 0: raise Exception("No crawler is set") @@ -296,7 +496,7 @@ def execute(self) -> ParsodaReport: reducer = self.__reducer secondary_key = self.__secondary_sort_key_function - # Staart ParSoDA workflow, initialize driver + # Start ParSoDA workflow, initialize driver print(f"[ParSoDA/{self.__app_name}] initializing driver: {type(self.__driver).__name__}") driver.set_chunk_size(self.__chunk_size*1024*1024) driver.set_num_partitions(self.__num_partitions) diff --git a/parsoda/model/social_data_item.py b/parsoda/model/social_data_item.py index 4eb260d..d9b667e 100644 --- a/parsoda/model/social_data_item.py +++ b/parsoda/model/social_data_item.py @@ -1,11 +1,15 @@ from datetime import datetime import json -from typing import Optional, List, Dict +from typing import Optional, List, Dict, final from parsoda.utils.json_serializer import obj_to_json, obj_from_json +@final class ItemPostTime: + """ + Class used for representing the time of posts + """ year: int = None month: int = None day: int = None @@ -22,6 +26,11 @@ def __init__(self, other: datetime): self.second = other.second def to_datetime(self) -> datetime: + """Convert the object to a standard datetime object + + Returns: + datetime: the datetime object built + """ return datetime( self.year, self.month, @@ -41,17 +50,28 @@ def __eq__(self, other) -> bool: self.second == other.second def to_json(self) -> str: + """Serialize the item to JSON. + + Returns: + str: JSON representation of this item + """ json_dict = {} json_dict["timestamp"] = self.to_datetime().timestamp() return json.dumps(json_dict) @staticmethod def from_json(json_str: str): + """Deserialize a social data item from JSON. + + Returns: + SocialDataItem: the deserialized data item + """ timestamp = int(json.loads(json_str)['timestamp']) dt = datetime.fromtimestamp(timestamp) return ItemPostTime(dt) +@final class ItemLocation: latitude: float longitude: float @@ -66,7 +86,11 @@ def __eq__(self, other) -> bool: self.longitude == other.longitude +@final class SocialDataItem: + """Class for defining ParSoDA-Py's standard representantion of social data items. + An instance of this class can be serialized as a JSON + """ def __init__(self): self.id = '' @@ -81,27 +105,67 @@ def __init__(self): self.original_format: str = '' def has_user_id(self): + """Check if this item has a user ID. + + Returns: + Boolean: true if the items has a user ID, false otherwise + """ return self.user_id is not None and self.user_id != '' def has_user_name(self): + """Check if this item has a user name. + + Returns: + Boolean: true if the items has a user name, false otherwise + """ return self.user_name is not None and self.user_name != '' def has_text(self): + """Check if this item has text. + + Returns: + Boolean: true if the items has text, false otherwise + """ return self.text is not None and self.text != '' def has_tags(self) -> bool: + """Check if this item has tags. + + Returns: + Boolean: true if the items has tags, false otherwise + """ return len(self.tags) > 0 def has_extras(self) -> bool: + """Check if this item has extra data. + + Returns: + Boolean: true if the items has extra data, false otherwise. + """ return len(self.extras) > 0 def has_date_posted(self) -> bool: + """Check if this item has posting date. + + Returns: + Boolean: true if the items has posting date, false otherwise. + """ return self.date_posted is not None def has_location(self) -> bool: + """Check if this item has a location. + + Returns: + Boolean: true if the items has a location, false otherwise + """ return self.location is not None def to_json(self) -> str: + """Serialize the item to JSON. + + Returns: + str: JSON representation of this item + """ json_dict = {} json_dict['id'] = self.id json_dict['user_id'] = self.user_id @@ -118,6 +182,11 @@ def to_json(self) -> str: @staticmethod def from_json(json_string: str): + """Deserialize a social data item from JSON. + + Returns: + SocialDataItem: the deserialized data item + """ self = SocialDataItem() json_dict = json.loads(json_string) @@ -158,6 +227,8 @@ def __repr__(self) -> str: class SocialDataItemBuilder: + """Class used for building new social data items + """ def __init__(self): self.item = SocialDataItem() @@ -173,42 +244,120 @@ def build(self) -> SocialDataItem: return built def set_id(self, id: str): + """Sets the item ID + + Args: + id (str): the item ID + + Returns: + SocialDataItemBuilder: this builder + """ self.item.id = str(id) return self def set_user_id(self, user_id: str): + """Sets the user ID + + Args: + id (str): the user ID + + Returns: + SocialDataItemBuilder: this builder + """ self.item.user_id = str(user_id) return self def set_user_name(self, user_name: str): + """Sets the user name. + + Args: + user_name (str): the user name. + + Returns: + SocialDataItemBuilder: this builder + """ self.item.user_name = str(user_name) return self def set_date_posted(self, date_posted: datetime): + """Sets the posting date. + + Args: + id (str): the posting date + + Returns: + SocialDataItemBuilder: this builder + """ self.item.date_posted = ItemPostTime(date_posted) return self def set_text(self, text: str): + """Sets the item text. + + Args: + id (str): the item text + + Returns: + SocialDataItemBuilder: this builder + """ self.item.text = str(text) return self def set_tags(self, tags: List): + """Sets the list of tags + + Args: + id (str): the list of tags + + Returns: + SocialDataItemBuilder: this builder + """ self.item.tags = tags return self def set_location(self, latitude: float, longitude: float): + """Sets the item location. + + Args: + id (str): the item location + + Returns: + SocialDataItemBuilder: this builder + """ self.item.location = ItemLocation(latitude, longitude) return self def put_extra(self, key, value): + """Add extra data to the item + + Args: + key (str): the data key + value: the value data + + Returns: + SocialDataItemBuilder: this builder + """ self.item.extras[key] = value return self def del_extra(self, key): + """Delete an extra value. + + Args: + key (str): the key of the data value to remove + + Returns: + SocialDataItemBuilder: this builder + """ del self.item.extras[key] return self def clear_extras(self): + """Clears the extra data + + Returns: + SocialDataItemBuilder: this builder + """ self.item.extras.clear() return self @@ -216,8 +365,12 @@ def set_extras(self, extras: dict): """ Sets all extras at once from a standard dictionary. Invoking this method all the extras previously set are deleted. - :param extras: dictionary - :return: + + Args: + extras (str): the extra dictionary + + Returns: + SocialDataItemBuilder: this builder """ self.item.extras = {} for key in extras: @@ -225,5 +378,13 @@ def set_extras(self, extras: dict): return self def set_original_format(self, original_format: str): + """Sets a name for the item original format + + Args: + original_format (str): a name for the original format + + Returns: + SocialDataItemBuilder: this builder + """ self.item.original_format = original_format return self diff --git a/parsoda/utils/generic_utils.py b/parsoda/utils/generic_utils.py deleted file mode 100644 index 32c65b0..0000000 --- a/parsoda/utils/generic_utils.py +++ /dev/null @@ -1,110 +0,0 @@ -# coding=utf-8 -import json - -from parsoda_costantino.common.item.TwitterItem import TwitterItem -from parsoda_costantino.common.item import FlickrItem -import time -import os - - - - - -def map_json(line_in): - try: - ab = FlickrItem(line_in) - return ab - except: - return None - - -def remove_service_file(): - path_service_file = "./resources/serviceFile.json" - if os.path.exists(path_service_file): - os.remove(path_service_file) - - -def get_service_file(): - path_service_file = "./resources/serviceFile.json" - return path_service_file - - -def add_to_service_file(fileIn): - num_elements = 0 - - path_service_file = "./resources/serviceFile.json" - - with open(path_service_file, 'a') as f: - for line in open(fileIn, 'r'): - try: - f.write(line) - num_elements += 1 - except: - pass - f.close() - return num_elements - - -def loadJsons(fileIn, type): - jsons = [] - for line in open(fileIn, 'r'): - try: - jsons.append((line, type)) - except: - pass - return jsons - - -def readJsons(listIn): - jsons = [] - for line, type in listIn: - try: - jsons.append((json.loads(line), type)) - except: - pass - return jsons - - -def create_AbstractGeotaggedItems(line): - try: - jsonEl = json.loads(line) - except Exception: - return None - try: - return FlickrItem(jsonEl) - except Exception: - pass - try: - return TwitterItem(jsonEl) - except Exception as e: - pass - return None - - -""" -Le funzioni di crawling restituiscono una lista di tuple del tipo: - (stringa che rappresenta il json del post, tipo del social usato) - il tipo del social viene passato a runtime e puo essere "Flickr" e "Twitter" - al momento. - Questa funziona legge il json stringa, crea la versione python del json e - crea l'opportuno abstract_geotagged_item -""" - - -def create_abstract_objects(list_json): - data = readJsons(list_json) - - items = [] - for tuple in data: - if tuple[1] == "Flickr": - items.append(FlickrItem(tuple[0])) - elif tuple[1] == "Twitter": - items.append(TwitterItem(tuple[0])) - data = None - - return items - - -def printJsons(jsons): - for element in jsons: - print((json.dumps(element, indent=4, separators=(". ", " = "), sort_keys=True)))