Electron microscopy
 
PythonML
Apache Spark
- Python Automation and Machine Learning for ICs -
- An Online Book: Python Automation and Machine Learning for ICs by Yougui Liao -
Python Automation and Machine Learning for ICs                                                           http://www.globalsino.com/ICs/        


Chapter/Index: Introduction | A | B | C | D | E | F | G | H | I | J | K | L | M | N | O | P | Q | R | S | T | U | V | W | X | Y | Z | Appendix

=================================================================================

Apache Spark is an open-source, distributed computing system that provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at UC Berkeley's AMPLab, Spark has an efficient, general-purpose data processing engine that supports a wide range of applications, such as batch processing, streaming analytics, machine learning, and graph processing. It is designed to perform both batch processing (similar to Hadoop) and new workloads like streaming, interactive queries, and machine learning, making it a versatile tool for handling big data challenges. Spark operates by distributing data across clusters and processing it in parallel, significantly speeding up tasks compared to older big data technologies.

The integral components of Spark's architecture and functionality are:

  • Functional Programming and Lambda Functions:

    • Apache Spark is built on Scala, which is a functional programming language. Functional programming principles are key to how transformations and actions are applied to data in Spark.

    • Lambda functions are anonymous functions defined on the fly. In Spark, they are frequently used to specify transformations and actions on datasets, like maps or filters, in a concise manner. Lambda functions make it easy to write distributed computing code.
  • Resilient Distributed Datasets (RDDs):
    • RDDs are the fundamental data structure of Spark. They allow data to be processed in parallel across many nodes of a cluster, which is essential for handling large datasets.
    • RDDs are designed to be resilient to node failures within a cluster. They achieve fault tolerance through lineage information, allowing Spark to recompute lost data from the original dataset.
  • Parallel Programming and Resilience:
    • Parallel programming in Spark enables the processing of large volumes of data by distributing tasks across multiple computing nodes. This division is crucial for enhancing the performance and scalability of data processing tasks.
    • Resilience in Spark comes from its ability to handle failures and ensure that the system continues to function. RDDs and their lineage contribute to this resilience by enabling the system to recover data after node failures.
  • Spark SQL and DataFrame Queries:
    • Spark SQL is a module for structured data processing. It allows users to execute SQL queries on data, which can be helpful for those familiar with SQL but new to big data technologies.
    • DataFrames are a part of Spark SQL and are similar in concept to DataFrames in Python's pandas library or tables in a relational database. They allow for more optimized storage and processing of data compared to RDDs, particularly through the use of Spark's Catalyst optimizer and Tungsten execution engine. That is, in Spark's operational framework, it utilizes two engines, Catalyst and Tungsten, in sequence to optimize and execute queries.
    • DataFrames and Spark SQL provide functionality for complex queries, aggregation, and data analysis, which are often necessary for big data applications.
  • Parts, Benefits, and Functions of Spark SQL and DataFrame Queries:         
    • The integration of SQL capabilities with the functional programming features of Spark allows for both declarative queries (like SQL) and functional programming (like Scala transformations), offering flexibility and power in data processing.
    • Users benefit from Spark SQL's ability to seamlessly mix SQL queries with programmatic data manipulations, all while maintaining high performance and ease of use.

Apache Spark has machine learning capabilities through its component called MLlib. MLlib is Spark's scalable machine learning library, designed to deliver high-quality algorithms that can exploit Spark's distributed computing architecture. It allows for performing machine learning tasks like classification, regression, clustering, and filtering on big datasets.

MLlib features several common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives. It's built to be scalable and to integrate seamlessly with the rest of the Spark ecosystem, enabling you to include machine learning in your data processing pipelines efficiently.

Since Spark operates in memory and processes large datasets in parallel, it can be significantly faster than other big data technologies like MapReduce. This makes MLlib a powerful tool for data scientists looking to apply machine learning on large datasets in real-time or near-real-time.

As Spark processes and manipulates data during task execution, the DAGScheduler enhances efficiency by managing the worker nodes throughout the cluster. This tracking of tasks enables fault tolerance, allowing it to reapply recorded operations to data from an earlier state.

Transformations in Spark are considered lazy because they do not compute their results immediately and the results are only computed when evaluated by actions. That is, the actual computation happens only when an action is called on the RDD (Resilient Distributed Dataset). This allows Spark to optimize the overall data processing workflow by reducing the number of passes it needs to take over the data and combining operations when possible. The "lazy" nature of transformations in Apache Spark, while beneficial for optimization and efficiency, also introduces several disadvantages:

  • Debugging Difficulty: Because transformations do not produce an immediate output, it can be harder to debug them. Errors in the data processing logic might not become apparent until an action is performed, which could be much later in the code. This delay can make it difficult to trace back to the root cause of the problem.
  • Common failure on a cluster: The common areas or reasons for Apache Spark application failure on a cluster typically involve issues related to user code, configuration, application dependencies, resource allocation, and network communication.
  • Unexpected Behavior on Actions: Since transformations are only executed when an action is called, any issues with the transformations (such as data corruption or logical errors) only surface at that point. This can lead to unexpected results or performance issues that are not evident during the coding phase.
  • Latency in Execution: Although lazy evaluation is generally beneficial for performance optimization, it can also lead to higher latency in scenarios where the execution plan takes a long time to compute. This is because Spark defers the construction of the execution plan until an action is invoked, potentially leading to a delay before any result is visible.
  • Resource Utilization Uncertainty: Because the actual computation doesn't occur until an action is called, it can be challenging to predict resource needs in advance. This uncertainty can complicate resource management, especially in shared or constrained environments.
  • Complexity in Understanding Program Flow: For new users or developers, understanding the flow of a program that uses lazy evaluation can be more complex. It might not be intuitively clear when data is actually being processed or moved, which can lead to misunderstandings about how the program operates.

In typical data processing workflows using Apache Spark, operations are generally performed in the following order:

  • Read: The first step involves reading data from various sources. This could include files, databases, or real-time data streams.
  • Transform: After reading the data, transformations are applied. This step may include filtering, aggregating, joining, or other operations to modify or enrich the data as per the requirements.
  • Analyze: Although "analyze" might imply a distinct phase of detailed data examination or querying, in practical terms, this often overlaps with or is part of the transformation step. However, when listed separately, it implies a more focused examination of the transformed data.
  • Write: Finally, the processed data is written out to a storage system, which could be a file system, database, or a data warehouse.
  • Load: The term "load" in the context of typical ETL (Extract, Transform, Load) operations generally means to move data into a final target database. In many Spark workflows, this is synonymous with or included in the "write" step.

Table 3404a. Cheatsheet of Apache Spark.

Package/Method  Description  Code Example
appName()  A name for your job to display on the cluster web UI

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate() 

cache()  An Apache Spark transformation often used on a DataFrame, data set, or RDD when you want to perform multiple actions. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster's workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, data set, or RDD in a single action. 

df = spark.read.csv("customer.csv")

df.cache() 

count()  Returns the number of elements with the specified value. 

count = df.count()

print(count) 

createTempView()  Creates a temporary view that can later be used to query the data. The only required parameter is the name of the view.  df.createOrReplaceTempView("cust_tbl") 
filter()  Returns an iterator where the items are filtered through a function to test if the item is accepted or not.  filtered_df = df.filter(df['age'] > 30) 
getOrCreate()  Get or instantiate a SparkContext and register it as a singleton object.  spark = SparkSession.builder.getOrCreate() 
import  Used to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity.  from pyspark.sql import SparkSession 
len()  Returns the number of items in an object. When the object is a string, the len() function returns the number of characters in the string. 

row_count = len(df.collect())

print(row_count) 

map()  Returns a map object (an iterator) of the results after applying the given function to each item of a given iterable (list, tuple, etc.)  rdd = df.rdd.map(lambda row: (row['name'], row['age'])) 
pip  To ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment.  pip list 
pip install  The pip install command looks for the latest version of the package and installs it.  pip install pyspark 
print()  Prints the specified message to the screen or other standard output device. The message can be a string or any other object; the object will be converted into a string before being written to the screen.  print("Hello, PySpark!") 
printSchema()  Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format.  df.printSchema() 
sc.parallelize()  Creates a parallelized collection. Distributes a local Python collection to form an RDD. Using range is recommended if the input represents a range for performance.  rdd = sc.parallelize([1, 2, 3, 4, 5]) 
select()  Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns.  selected_df = df.select('name', 'age') 
show()  Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format . By default, it shows only twenty rows, and the column values are truncated at twenty characters.  df.show() 
spark.read.json  Spark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file.  json_df = spark.read.json("customer.json") 
spark.sql()  To issue any SQL query, use the sql() method on the SparkSession instance . All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required. 

result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")

result.show() 

time()  Returns the current time in the number of seconds since the Unix Epoch. 

from pyspark.sql.functions import current_timestamp

current_time = df.select(current_timestamp().alias("current_time"))

current_time.show() 

Table 3404b. Cheatsheet of monitoring and tuning.

Package/Method  Description  Code Example
agg() Used to get the aggregate values like count, sum, avg, min, and max for each group. agg_df = df.groupBy("column_name").agg({"column_to_aggregate": "sum"})  
cache() Apache Spark transformation that is often used on a DataFrame, data set, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, data set, or RDD in the memory of your cluster's workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, Dataset, or RDD in a single action. df = spark.read.csv("customer.csv")
df.cache()   
cd Used to move efficiently from the existing working directory to different directories on your system. Basic syntax of the cd command:
      cd [options]… [directory]
      cd /usr/local/folder1
Example 2: Get back to the previous working directory.
      cd -
Example 3: Move up one level from the present working directory tree.
      cd ..  
def  Used to define a function. It is placed before a function name that is provided by the user to create a user-defined function.  def greet(name):
This function takes a name as a parameter and prints a greeting.
      print(f"Hello, {name}!")
Calling the function:
      greet("John")
docker exec   Runs a new command in a running container. Only runs while the container's primary process is running, and it is not restarted if the container is restarted.   docker exec -it container_name command_to_run
docker exec -it my_container /bin/bash
docker rm  Used to remove one or more containers.  To remove a single container by name or ID:
      docker rm container_name_or_id
To remove multiple containers by specifying their names or IDs:
      docker rm container1_name_or_id container2_name_or_id
To remove all stopped containers:
      docker rm $(docker ps -aq)
docker run  It runs a command in a new container, getting the image and starting the container if needed.  docker run [OPTIONS] IMAGE [COMMAND] [ARG...]
for The for loop operates on lists of items. It repeats a set of commands for every item in a list.       fruits = ["apple", "banana", "cherry"]
Iterating through the list using a for loop for fruit in fruits:
      print(f"I like {fruit}s")
groupby()  Used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.        data = {'Category': ['A', 'B', 'A', 'B', 'A', 'B'], 'Value': [10, 20, 15, 25, 30, 35]}
      df = pd.DataFrame(data)
Grouping by "Category" and performing aggregation operations:
      grouped = df.groupby('Category').agg({'Value': ['count', 'sum', 'mean', 'min', 'max']})
      print(grouped)
repartition()  Used to increase or decrease the RDD or DataFrame partitions by number of partitions or by a single column name or multiple column names.  Create a sample DataFrame:
      data = [("John", 25), ("Peter", 30), ("Julie", 35), ("David", 40), ("Eva", 45)]
      columns = ["Name", "Age"]
      df = spark.createDataFrame(data, columns)
Show the current number of partitions.
      print("Number of partitions before repartitioning: ",       df.rdd.getNumPartitions())
Repartition the DataFrame to 2 partitions.
      df_repartitioned = df.repartition(2)
Show the number of partitions after repartitioning.
      print("Number of partitions after repartitioning: ",       df_repartitioned.rdd.getNumPartitions())
Stop the SparkSession.
      spark.stop()
return  Used to end the execution of the function call and returns the result (value of the expression following the return keyword) to the caller.        def add_numbers(a, b):
      def add_numbers(a, b):
      return result
Calling the function and capturing the returned value:
      sum_result = add_numbers(5, 6)
Printing the result.
      print("The sum is:", sum_result)
show()  Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format. By default, it shows only 20 rows, and the column values are truncated at 20 characters.  df.show()
spark.read.csv(“path”)  Using this, you can read a CSV file with fields delimited by pipe, comma, tab (and many more) into a Spark DataFrame.  from pyspark.sql import SparkSession
Create a SparkSession.
      spark = SparkSession.builder.appName("CSVReadExample").getOrCreate()
Read a CSV file into a Spark DataFrame.
      df = spark.read.csv("path_to_csv_file.csv", header=True, inferSchema=True)
Show the first few rows of the DataFrame.
      df.show()
Stop the SparkSession.
      spark.stop()  
wget  Stands for web get. The wget is a free noninteractive file downloader command. Noninteractive means that it can work in the background when the user is not logged in.  Basic syntax of the wget command; commonly used options are [-V], [-h], [-b], [-e], [-o], [-a], [-q]
      wget [options]… [URL]…
Example 1: Specifies to download file.txt over HTTP website URL into the working directory.
      wget http://example.com/file.txt
Example 2: Specifies to download the archive.zip over HTTP website       URL in the background and returns you to the command prompt in the interim.
      wget -b http://www.example.org/files/archive.zip
withColumn()  Transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.  Sample DataFrame:
      data = [("John", 25), ("Peter", 30), ("David", 35)]
      columns = ["Name", "Age"]
      df = spark.createDataFrame(data, columns)
Using withColumn to create a new column and change values
      updated_df = df \
      .withColumn("DoubleAge", col("Age") * 2) # Create a new column       "DoubleAge" by doubling the "Age" column
      updated_df = updated_df \
      .withColumn("AgeGroup", when(col("Age") <= 30, "Young")
      .when((col("Age") > 30) & (col("Age") <= 40), "Middle-aged")
      .otherwise("Old")) # Create a new column "AgeGroup" based on conditions
      updated_df.show()
Stop the SparkSession.
      spark.stop()

 

===========================================

         
         
         
         
         
         
         
         
         
         
         
         
         
         
         
         
         
         

 

 

 

 

 



















































 

 

 

 

 

=================================================================================