Raspberry Pi IoT In Python Using Linux Drivers

Programs

Cdrivers360

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project. 

 

Note: The VS Code task listings are at the very end of this page.

The only downside is that you have to create a project to paste the code into.

To do this follow the instructionin the book-- in particular make sure you have added any libraries to the link step and are running as root if required. 

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 32

import io
from time import sleep

fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("none")
fd.close()

fd = io.open("/sys/class/leds/led0/brightness", "w")
while(True):
    fd.write("0")
    fd.flush()
    sleep(1)
    fd.write("1")
    fd.flush()
    sleep(1)

 

Page 33

import io
from time import sleep

fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("timer")
fd.close()

fd = io.open("/sys/class/leds/led0/delay_on", "w")
fd.write("2000")
fd.close()

fd = io.open("/sys/class/leds/led0/delay_off", "w")
fd.write("3000")
fd.close()

Page 42

import gpiod
chip = gpiod.Chip("0")
print(chip.label())
print(chip.name())
print(chip.num_lines())
chip.close()

Page 45

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
while(True):
    line.set_value(1)
    line.set_value(0)

Page 46

import gpiod

chip = gpiod.Chip("0")
lines = chip.get_lines([4, 17])

lines.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT,
              default_vals=[0, 1])
while(True):
    lines.set_values([1, 0])
    lines.set_values([0, 1])

Page 48

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_OUT,
             default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
t = time_ns()
while(line.get_value() == 0):
    pass
print((time_ns()-t)/1000)

 

Page 55

import fcntl
import struct
import io
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401

f = io.open("/dev/gpiochip0", "rb", buffering=0)

gpiochip_info = struct.Struct("32s 32s L")
buffer = gpiochip_info.pack(b' ', b' ', 0)

result = fcntl.ioctl(f, GPIO_GET_CHIPINFO_IOCTL, buffer)

name, label, lines = gpiochip_info.unpack(result)

print(name.rstrip(b'\0').decode("utf-8"))
print(label.rstrip(b'\0').decode("utf-8"))
print(lines)

Page 59

import fcntl
import struct
import io

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
                                 *values, b"Output test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", buffer, 360)[0]

values[0] = 1
values[1] = 0
state1 = gpiohandle_data.pack(*values)

values[0] = 0
values[1] = 1
state2 = gpiohandle_data.pack(*values)

while(True):
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state1)
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state2)

Page 60

import fcntl
import struct
import io
import os

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Input test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 360)[0]
inputBuffer = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL,
                     inputBuffer)
os.close(fL)
print(result[0], result[1])

Page 61

import fcntl
import struct
import io
import os
from time import sleep, time_ns
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
values = [0]*64

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Resistance test", 1, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
state = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
                                 *values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
t = time_ns()
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
while(result[0] == 0):
    result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
print((time_ns()-t)/1000)
os.close(fL)
f.close()

Page 68

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

event = line.event_wait(sec=1)
if event:
    print("Event on line 4")
else:
    print("Time out")

Page 69

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py",
             type=gpiod.LINE_REQ_EV_RISING_EDGE)
t = time_ns()
event = line.event_read()
print((event.sec*1000000000+event.nsec-t)/1000)

Page 70

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

while(True):
    while True:
        event1 = line.event_read()
        if event1.type == event1.RISING_EDGE:
            break
    while True:
        event2 = line.event_read()
        if event2.type == gpiod.LineEvent.FALLING_EDGE:
            break
    print((event2.sec-event2.sec)*1000000000 +
          (event2.nsec-event1.nsec))

Page 71a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_IN)
print("Press the button", flush=True)
sleep(20)
if line.get_value() == 1:
    print("button pressed")
else:
    print("button not pressed")

Page 71b

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
print("Press the button", flush=True)
sleep(20)
event = line.event_wait(sec=0)
if event:
    print("button pressed")
else:
    print("button not pressed")

Page 72

import gpiod


def event_poll(line):
    event = line.event_wait(sec=0)
    if event:
        event = line.event_read()
    return event


chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
while True:
    event = event_poll(line)
    if event:
        print(event.sec, flush=True)

Page 75

import fcntl
import struct
import io
import os
from time import sleep, time_ns
import select

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIO_GET_LINEEVENT_IOCTL = 0xC030B404

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

GPIOEVENT_REQUEST_RISING_EDGE = 0x01
GPIOEVENT_REQUEST_FALLING_EDGE = 0x02
GPIOEVENT_REQUEST_BOTH_EDGES = 0x03

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
gpioevent_request = struct.Struct("L L L 32s L")
gpioevent_data = struct.Struct("Q L")

buffer = gpioevent_request.pack(4, GPIOHANDLE_REQUEST_INPUT,
                                GPIOEVENT_REQUEST_BOTH_EDGES, b"Event test", 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEEVENT_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 44)[0]

e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)

Page 76

import gpiod
import struct
import io
import os
import select

gpioevent_data = struct.Struct("Q L")

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

fL = line.event_get_fd()
e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)

Page 77

import gpiod
from time import sleep
import threading


def waitForInterrupt(line):
    while(True):
        event = line.event_read()
        print(event.sec*1000000000+event.nsec, flush=True)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

IntThread = threading.Thread(target=waitForInterrupt, args=(line,))
IntThread.start()


while True:
    sleep(2)
    print("running", flush=True)

Page 87

import io

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
              "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open(
    "/ sys/bus/iio/devices/iio: device0 /in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()

Page 89

import subprocess
import io
import fcntl


def checkDht11():
    indicator = "dht11  gpiopin=4"
    command = ["sudo", "dtoverlay", "dht11", "gpiopin=4"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkDht11()

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
              "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open(
    "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()

Page 111a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 25

ontime = period/1000 * duty / 100
offtime = period/1000 - ontime

while(True):
    line.set_value(1)
    sleep(ontime)
    line.set_value(0)
    sleep(offtime)

Page 111b

import gpiod
from time import sleep
import threading


def softPWM(line, period, duty):
    ontime = period/1000 * duty / 100
    offtime = period/1000 - ontime
    while(True):
        line.set_value(1)
        sleep(ontime)
        line.set_value(0)
        sleep(offtime)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 75

IntThread = threading.Thread(target=softPWM, args=(line, period, duty))
IntThread.start()

while(True):
    print("working", flush=True)
    sleep(2)

Page 115

import subprocess
import io


def checkPWM():
    indicator = "pwm-2chan"
    command = ["sudo", "dtoverlay", "pwm-2chan"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkPWM()

fdw = io.open("/sys/class/pwm/pwmchip0/export", "wb", buffering=0)
fdw.write(b"0")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/period", "wb", buffering=0)
fdw.write(b"10000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/enable", "wb", buffering=0)
fdw.write(b"1")
fdw.close()

Page 116

import subprocess
import io
from time import sleep
import os


class Pwm:
    def __init__(self, channel, f, duty):
        if not(channel == 0 or channel == 1):
            return
        self.chan = str(channel)
        indicator = "pwm-2chan"
        command = ["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(
            ["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
        if output.find(indicator) == -1:
            temp = subprocess.Popen(command, stdout=subprocess.PIPE)
            output = str(temp.communicate())
            print(output, flush=True)

        if not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan)):
            fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
            fdw.write(self.chan)
            fdw.close()

        while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable")):
            pass

        self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/period", "w")
        self.setFreq(f)
        self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self, f):
        self.f = int(1000000000/f)
        self.fdwp.write(str(self.f))
        self.fdwp.flush()

    def setDuty(self, duty):
        self.duty = int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()

    def disableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()

    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
        fdw.write(self.chan)
        fdw.close()

Page 127

import subprocess
import io
from time import sleep
import os


class Pwm:
    def __init__(self, channel, f, duty):
        if not(channel == 0 or channel == 1):
            return
        self.chan = str(channel)
        indicator = "pwm-2chan"
        command = ["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(
            ["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
        if output.find(indicator) != -1:
            temp = subprocess.Popen(
                ["sudo", "dtoverlay", "-r", "0"], stdout=subprocess.PIPE)
            output = str(temp.communicate())
            print(output, flush=True)

        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)

        if os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan):
            fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
            fdw.write(self.chan)
            fdw.close()

        while os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable"):
            pass
        fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
        fdw.write(self.chan)
        fdw.close()

        while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/period")):
            pass

        self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/period", "w")
        self.setFreq(f)
        self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self, f):
        self.fdwp.write("200")
        self.fdwp.flush()
        self.f = int(1000000000/f)
        self.fdwp.write(str(self.f))
        self.fdwp.flush()

    def setDuty(self, duty):
        self.duty = int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()

    def disableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()

    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
        fdw.write(self.chan)
        fdw.close()

    def inverChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
        fdw.write("inversed")
        fdw.close()

    def normalChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
        fdw.write("normal")
        fdw.close()

Page 137

import subprocess
import spidev
from time import sleep


def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 5000
Data = [0xAA]
print(hex(Data[0]))
data = spi.xfer(Data)
print(hex(Data[0]))
if Data == 0xAA:
    print("match")
spi.close()

Page 146

import subprocess
import spidev


def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 100000
spi.mode = 0
spi.bits_per_word = 8

Data = [0x01, 0x80, 0x00]
spi.xfer(Data)
raw = (Data[1] & 0x03) << 8 | Data[2]
print(raw)
volts = raw * 3.3 / 1023.0
print(volts, "V")
spi.close()

Page 165

import subprocess
import io
import fcntl


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()

I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xE7]))
data = fdr.read(1)
print(data)
fdr.close()
fdw.close()

Page 167

import subprocess
import io
import fcntl
from time import sleep

checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
print("msb=", msb, " lsb=", lsb, " crc=", crc)

fdw.close()
fdr.close()

Page 171

import subprocess
import io
import fcntl
from time import sleep


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")

fdw.write(bytearray([0xF5]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)

msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536

print("humidity=", hum, "%")

print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()

Page 183

import subprocess
import io
import struct
import fcntl
from time import sleep
import mmap
import os


def setTimeout(timeout):
    # find base address
    peripherals_base = 0x20000000
    try:
        with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
            buf = f.read(16)
            peripherals_base = buf[4] << 24 | buf[5] << 16 | buf[6] << 8 | buf[7] << 0
            if peripherals_base == 0:
                peripherals_base = buf[8] << 24 | buf[9] << 16 | buf[10] << 8 | buf[11] << 0
    except:
        pass
#open /dev/mem
    try:
        memfd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
    except OSError:
        print('unable to open /dev/mem upgrade your kernel or run as root')
# map file from start of BCS1 into memory
    mem = mmap.mmap(memfd, 4096, offset=peripherals_base + 0x804000)
    os.close(memfd)

# Write a value to the clock stretch timeout register
    struct.pack_into('<L', mem, 0x1C, timeout)

    mem.flush
    mem.close()


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)

setTimeout(10000)

fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)


fdw.write(bytearray([0xE3]))
data = fdr.read(3)

msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(crcCheck(msb, lsb, crc))

fdw.write(bytearray([0xE5]))
data = fdr.read(3)

msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536

print("humidity=", hum, "%")
print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()

Page 186

import subprocess
from ctypes import *
import os
import fcntl
import io


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703
I2C_FUNCS = 0x0705

I2C_FUNC_I2C = 0x00000001
I2C_FUNC_10BIT_ADDR = 0x00000002
I2C_FUNC_PROTOCOL_MANGLING = 0x00000004
I2C_FUNC_NOSTART = 0x00000010
I2C_FUNC_SLAVE = 0x00000020

i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
support = c_uint32(0)
fcntl.ioctl(i2cfd, I2C_FUNCS, support)
support = support.value
if support & I2C_FUNC_I2C:
    print("I2C Support")
if support & I2C_FUNC_10BIT_ADDR:
    print("10 bit address Support")
if support & I2C_FUNC_PROTOCOL_MANGLING:
    print("I2C Mangling Support")
if support & I2C_FUNC_NOSTART:
    print("I2C Nostart Support")
if support & I2C_FUNC_SLAVE:
    print("I2C Slave Support")

Page 191

import fcntl
import os
from ctypes import *


class Msgs(Structure):
    _fields_ = [("addr", c_uint16),
                ("flags", c_uint16),
                ("len", c_uint16),
                ("buf", POINTER(c_uint8))
                ]


class MsgSet(Structure):
    _fields_ = [("msgs", POINTER(Msgs)),
                ("nmsgs", c_uint32)
                ]


def i2cReadRegister(i2cfd, slaveaddr, reg, n):
    I2C_RDWR = 0x0707
    I2C_M_RD = 0x0001

    msgs = (Msgs*2)()

    msgs[0].addr = c_uint16(slaveaddr)
    msgs[0].flags = c_uint16(0)
    msgs[0].len = c_uint16(1)
    msgs[0].buf = POINTER(c_uint8)(c_uint8(reg))

    msgs[1].addr = c_uint16(slaveaddr)
    msgs[1].flags = c_uint16(I2C_M_RD)
    msgs[1].len = c_uint16(n)
    buf = (c_uint8*n)(0)
    msgs[1].buf = POINTER(c_uint8)(buf)

    msgset = MsgSet()
    msgset.msgs = POINTER(Msgs)(msgs)
    msgset.nmsgs = c_uint32(2)

    libc = CDLL("libc.so.6")
    libc.ioctl(i2cfd, I2C_RDWR, byref(msgset))

    return msgs[1].buf


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
data = i2cReadRegister(i2cfd, 0x40, 0xE3, 3)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(hex(msb), hex(lsb), hex(crc))
print(crcCheck(msb, lsb, crc))

Page 200

import subprocess
import io


def checkLM75():
    indicator1 = "i2c_arm=on"
    command1 = ["sudo", "dtparam", "i2c_arm=on"]
    indicator2 = "lm75"
    command2 = ["sudo", "dtoverlay", "i2c-sensor", "lm75", "addr=0x48"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator1) == -1:
        temp = subprocess.Popen(command1, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
    if output.find(indicator2) == -1:
        temp = subprocess.Popen(command2, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


checkLM75()

fdr = io.open("/sys/class/hwmon/hwmon2/temp1_input", "r")
buf = fdr.read()
print(buf)
temp = int(buf) / 1000
print(temp)
fdr.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max", "w")
fdw.write("19000")
fdw.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max_hyst", "w")
fdw.write("18000")
fdw.close()

Page 204

import subprocess
import io


def loadHtu21():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find("htu21") == -1:
        temp = subprocess.Popen(
            ["sudo", "dtoverlay", "i2c-sensor", "htu21"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


loadHtu21()

fdrt = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input", "r")
print(int(fdrt.read())/1000)
fdrh = io.open(
    "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "r")
print(int(fdrh.read())/1000)

Page 217

import subprocess
import io


def load1w(pin):
    indicator = "w1-gpio"
    command = ["sudo", "dtoverlay", "w1-gpio", "gpiopin="+str(pin)]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


def getDevices():
    fdr = io.open(
        "/sys/bus/w1/drivers/w1_master_driver/w1_bus_master1/w1_master_slaves", "r")
    buffer = fdr.read()
    fdr.close()
    return buffer.split()


def getData(dev, name):
    fdr = io.open("/sys/bus/w1/devices/"+dev+"/"+name, "r")
    data = fdr.read()
    fdr.close()
    return data


load1w(4)
devs = getDevices()
name = getData(devs[0], "name")
print("Name ", name)
resolution = getData(devs[0], "resolution")
print("Resolution ", resolution)
w1_slave = getData(devs[0], "w1_slave")
print("w1_slave ", w1_slave)
temperature = getData(devs[0], "temperature")
print("temperature ", temperature)
temperature = int(temperature) / 1000
print("temperature ", temperature, "C")
alarms = getData(devs[0], "alarms")
print("Alarms ", alarms)

Page 228

from ctypes import *
from socket import *
from struct import *
import os
import subprocess
import io
from time import sleep


NLMSG_DONE = 0x3

CN_W1_IDX = 0x3
CN_W1_VAL = 0x1

W1_SLAVE_ADD = 0
W1_SLAVE_REMOVE = 1
W1_MASTER_ADD = 2
W1_MASTER_REMOVE = 3
W1_MASTER_CMD = 4
W1_SLAVE_CMD = 5
W1_LIST_MASTERS = 6

W1_CMD_READ = 0
W1_CMD_WRITE = 1
W1_CMD_SEARCH = 2
W1_CMD_ALARM_SEARCH = 3
W1_CMD_TOUCH = 4
W1_CMD_RESET = 5
W1_CMD_SLAVE_ADD = 6
W1_CMD_SLAVE_REMOVE = 7
W1_CMD_LIST_SLAVES = 8
W1_CMD_MAX = 9

NETLINK_CONNECTOR = 11

nl_seq = 0


class nlmsghdr(Structure):
    _fields_ = [("nlmsg_len", c_uint32),
                ("nlmsg_type", c_uint16),
                ("nlmsg_flags", c_uint16),
                ("nlmsg_seq", c_uint32),
                ("nlmsg_pid", c_uint32)
                ]


cnh = nlmsghdr()
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cnh.nlmsg_pid = c_uint32(os.getpid())
cnh.nlmsg_type = c_uint16(NLMSG_DONE)
cnh.nlmsg_flags = c_uint16(0)


class cn_msg(Structure):
    _fields_ = [("idx", c_uint32),
                ("val", c_uint32),
                ("seq", c_uint32),
                ("ack", c_uint32),
                ("len", c_uint16),
                ("flags", c_uint16),
                ]


cmsg = cn_msg()
cmsg.idx = c_uint32(CN_W1_IDX)
cmsg.val = c_uint32(CN_W1_VAL)
cmsg.seq = c_uint32(cnh.nlmsg_seq)
cmsg.ack = c_uint32(0)


class w1_netlink_msg(Structure):
    _fields_ = [("type", c_uint8),
                ("status", c_uint8),
                ("len", c_uint16),
                ("id", c_uint8*8),
                ]


msg = w1_netlink_msg()
msg.type = c_uint8(W1_MASTER_CMD)
msg.id = (c_uint8*8).from_buffer_copy(c_uint64(1))


class w1_netlink_cmd(Structure):
    _fields_ = [("cmd", c_uint8),
                ("res", c_uint8),
                ("len", c_uint16),
                ]


w1_cmd = w1_netlink_cmd()
w1_cmd.cmd = c_uint8(W1_CMD_SEARCH)
w1_cmd.len = c_uint16(0)


msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)


s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR)
s.connect((0, AF_NETLINK))

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("Bytes received", len(buffer2), flush=True)

p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)

num = int(w1_cmdr.len/8)
print("number of slaves", num)

p = p+len(bytes(w1_cmdr))
slaves = []
for i in range(num):
    slaves.append(c_uint64.from_buffer_copy(buffer2, p+i*8).value)
print([hex(slave) for slave in slaves])

# GET ACK
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

# READ TEMPERATURE
# SET UP SLAVE COMMAND
print()
print("Read Temperature")

msg.type = W1_SLAVE_CMD

id = slaves[0].to_bytes(8, "little")
msg.id = (c_uint8*8).from_buffer_copy(id)


# SETUP AND SEND WRITE 0x44 = CONVERT
w1_cmd.cmd = c_uint8(W1_CMD_WRITE)
w1_cmd_data = c_uint8(0x44)


cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cmsg.seq = c_uint32(cnh.nlmsg_seq)

w1_cmd.len = c_uint16(1)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg) + \
    bytearray(w1_cmd)+bytearray(w1_cmd_data)

n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
sleep(1)

# READ SCRATCH PAD
cnh.nlmsg_seq = nl_seq
nl_seq += 1
cmsg.seq = cnh.nlmsg_seq

w1_cmd.cmd = W1_CMD_WRITE
w1_cmd_data = c_uint8(0xBE)

w1_cmd2 = w1_netlink_cmd()
w1_cmd2.cmd = c_uint8(W1_CMD_READ)


w1_cmd.len = c_uint16(1)
w1_cmd2.len = c_uint16(9)
msg.len = c_uint16(len(bytearray(w1_cmd)) +
                   len(bytearray(w1_cmd2)) + w1_cmd.len+w1_cmd2.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

w1_cmd2_data = (c_uint8*9)(0)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd) + \
    bytearray(w1_cmd_data)+bytearray(w1_cmd2)+bytearray(w1_cmd2_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)

buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

buffer2 = s.recv(65535)
print("length of data received ", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
# print(cn_cnhr.nlmsg_len)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
p = p+len(bytes(w1_cmdr))

temp = c_uint16.from_buffer_copy(buffer2, p)

print("number of bytes found ", w1_cmdr.len, flush=True)
print(temp.value/16)

VSCODE TASKS

 

settings.json

which has to be customized to your IP, user name and folder:

{
    "sshUser": "mike",
    "sshEndpoint": "192.168.253.20",
    "remoteDirectory": "/home/mike/Documents/${workspaceFolderBasename}",  
}

tasks.json

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "copyToRemote",
            "type": "shell",
            "command": "scp -r ${fileDirname} ${config:sshUser}@${config:sshEndpoint}:${config:remoteDirectory}/",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
                "clear": true
            }
        },
        {
            "label": "makeRemoteWorkSpace",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'mkdir  ${config:remoteDirectory}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunR",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'python3 ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunRemote",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "RunR"
            ],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            },
        },
        {
            "label": "StopREmotePython",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'pkill python3'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": true,
            }
        },
        {
            "label": "wait",
            "type": "shell",
            "command": "timeout 10"
        },
        {
            "label": "tunnel",
            "type": "shell",
            "command": "ssh -2 -L 5678:localhost:5678  ${config:sshUser}@${config:sshEndpoint}",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "startDebug",
            "type": "shell",
            "command": "ssh -2 ${config:sshUser}@${config:sshEndpoint} 'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "copyAndDebug",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "startDebug",
                "wait"
            ],
            "presentation": {
                "showReuseMessage": false,
            },
        },
    ]
}
launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Remote Attach",
            "type": "python",
            "request": "attach",
            "connect": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}/${relativeFileDirname}/",
                    "remoteRoot": "${config:remoteDirectory}/${relativeFileDirname}"
                }
            ],
            "preLaunchTask": "copyAndDebug",
            "postDebugTask": "StopREmotePython"
        }
    ]
}